From 8b99e7fd8c46f29ca8d3ddb41add60a3d1d948ca Mon Sep 17 00:00:00 2001 From: Mauro Stettler Date: Tue, 11 Jun 2019 01:42:29 +0000 Subject: [PATCH] move ArchiveRequest instantiation into importer package the instantiation of the ArchiveRequest struct should be located together with the struct definition, this moves it there --- cmd/mt-whisper-importer-reader/main.go | 69 +----- mdata/importer/archive_request.go | 135 +++++++++++ mdata/importer/archive_request_gen.go | 182 +++++++++++++++ mdata/importer/archive_request_gen_test.go | 123 ++++++++++ mdata/importer/chunk_encoder.go | 2 +- mdata/importer/chunk_encoder_test.go | 4 +- mdata/importer/converter.go | 12 +- mdata/importer/converter_test.go | 58 ++--- mdata/importer/cwr.go | 58 ----- mdata/importer/cwr_gen.go | 259 --------------------- mdata/importer/cwr_gen_test.go | 113 --------- 11 files changed, 479 insertions(+), 536 deletions(-) create mode 100644 mdata/importer/archive_request.go create mode 100644 mdata/importer/archive_request_gen.go create mode 100644 mdata/importer/archive_request_gen_test.go diff --git a/cmd/mt-whisper-importer-reader/main.go b/cmd/mt-whisper-importer-reader/main.go index 83d3037009..ea91022312 100644 --- a/cmd/mt-whisper-importer-reader/main.go +++ b/cmd/mt-whisper-importer-reader/main.go @@ -21,7 +21,6 @@ import ( "github.com/grafana/metrictank/logger" "github.com/grafana/metrictank/mdata/importer" "github.com/kisielk/whisper-go/whisper" - "github.com/raintank/schema" log "github.com/sirupsen/logrus" ) @@ -160,7 +159,7 @@ func processFromChan(pos *posTracker, files chan string, wg *sync.WaitGroup) { name := getMetricName(file) log.Debugf("Processing file %s (%s)", file, name) - data, err := getMetric(w, file, name) + data, err := importer.NewArchiveRequest(w, schemas, file, name, uint32(*importFrom), uint32(*importUntil), *writeUnfinishedChunks) if err != nil { log.Errorf("Failed to get metric: %q", err.Error()) continue @@ -239,72 +238,6 @@ func getMetricName(file string) string { return *namePrefix + strings.Replace(strings.TrimSuffix(file, ".wsp"), "/", ".", -1) } -func getMetric(w *whisper.Whisper, file, name string) (*importer.ArchiveRequest, error) { - if len(w.Header.Archives) == 0 { - return nil, fmt.Errorf("Whisper file contains no archives: %q", file) - } - - method, err := importer.ConvertWhisperMethod(w.Header.Metadata.AggregationMethod) - if err != nil { - return nil, err - } - - points := make(map[int][]whisper.Point) - for i := range w.Header.Archives { - p, err := w.DumpArchive(i) - if err != nil { - return nil, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file) - } - points[i] = p - } - - res := &importer.ArchiveRequest{ - MetricData: schema.MetricData{ - Name: name, - Value: 0, - Interval: int(w.Header.Archives[0].SecondsPerPoint), - Unit: "unknown", - Time: 0, - Mtype: "gauge", - Tags: []string{}, - }, - } - res.MetricData.SetId() - - _, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint)) - converter := importer.NewConverter(w.Header.Archives, points, method, uint32(*importFrom), uint32(*importUntil)) - for retIdx, retention := range selectedSchema.Retentions { - convertedPoints := converter.GetPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints)) - for m, p := range convertedPoints { - if len(p) == 0 { - continue - } - - var archive schema.Archive - if retIdx > 0 { - archive = schema.NewArchive(m, retention.ChunkSpan) - } - - encodedChunks := importer.EncodeChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan, *writeUnfinishedChunks) - for _, chunk := range encodedChunks { - res.ChunkWriteRequests = append(res.ChunkWriteRequests, importer.NewChunkWriteRequest( - archive, - uint32(retention.MaxRetention()), - chunk.Series.T0, - chunk.Encode(retention.ChunkSpan), - time.Now(), - )) - } - - if res.MetricData.Time < int64(p[len(p)-1].Timestamp) { - res.MetricData.Time = int64(p[len(p)-1].Timestamp) - } - } - } - - return res, nil -} - // scan a directory and feed the list of whisper files relative to base into the given channel func getFileListIntoChan(pos *posTracker, fileChan chan string) { filepath.Walk( diff --git a/mdata/importer/archive_request.go b/mdata/importer/archive_request.go new file mode 100644 index 0000000000..3a0117a5e9 --- /dev/null +++ b/mdata/importer/archive_request.go @@ -0,0 +1,135 @@ +package importer + +import ( + "bufio" + "bytes" + "compress/gzip" + "fmt" + "github.com/tinylib/msgp/msgp" + "io" + "time" + + "github.com/grafana/metrictank/conf" + "github.com/kisielk/whisper-go/whisper" + "github.com/raintank/schema" +) + +//go:generate msgp + +// ArchiveRequest is a complete representation of a Metric together with some +// chunk write requests containing data that shall be written into this metric +type ArchiveRequest struct { + MetricData schema.MetricData + ChunkWriteRequests []ChunkWriteRequest +} + +func NewArchiveRequest(w *whisper.Whisper, schemas conf.Schemas, file, name string, from, until uint32, writeUnfinishedChunks bool) (*ArchiveRequest, error) { + if len(w.Header.Archives) == 0 { + return nil, fmt.Errorf("Whisper file contains no archives: %q", file) + } + + method, err := convertWhisperMethod(w.Header.Metadata.AggregationMethod) + if err != nil { + return nil, err + } + + points := make(map[int][]whisper.Point) + for i := range w.Header.Archives { + p, err := w.DumpArchive(i) + if err != nil { + return nil, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file) + } + points[i] = p + } + + res := &ArchiveRequest{ + MetricData: schema.MetricData{ + Name: name, + Value: 0, + Interval: int(w.Header.Archives[0].SecondsPerPoint), + Unit: "unknown", + Time: 0, + Mtype: "gauge", + Tags: []string{}, + }, + } + res.MetricData.SetId() + + _, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint)) + converter := newConverter(w.Header.Archives, points, method, from, until) + for retIdx, retention := range selectedSchema.Retentions { + convertedPoints := converter.getPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints)) + for m, p := range convertedPoints { + if len(p) == 0 { + continue + } + + var archive schema.Archive + if retIdx > 0 { + archive = schema.NewArchive(m, retention.ChunkSpan) + } + + encodedChunks := encodeChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan, writeUnfinishedChunks) + for _, chunk := range encodedChunks { + res.ChunkWriteRequests = append(res.ChunkWriteRequests, NewChunkWriteRequest( + archive, + uint32(retention.MaxRetention()), + chunk.Series.T0, + chunk.Encode(retention.ChunkSpan), + time.Now(), + )) + } + + if res.MetricData.Time < int64(p[len(p)-1].Timestamp) { + res.MetricData.Time = int64(p[len(p)-1].Timestamp) + } + } + } + + return res, nil +} + +func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) { + var buf bytes.Buffer + + buf.WriteByte(byte(uint8(1))) + + g := gzip.NewWriter(&buf) + err := msgp.Encode(g, a) + if err != nil { + return &buf, fmt.Errorf("ERROR: Encoding MGSP data: %q", err) + } + + err = g.Close() + if err != nil { + return &buf, fmt.Errorf("ERROR: Compressing MSGP data: %q", err) + } + + return &buf, nil +} + +func (a *ArchiveRequest) UnmarshalCompressed(b io.Reader) error { + versionBuf := make([]byte, 1) + readBytes, err := b.Read(versionBuf) + if err != nil || readBytes != 1 { + return fmt.Errorf("ERROR: Failed to read one byte: %s", err) + } + + version := uint8(versionBuf[0]) + if version != 1 { + return fmt.Errorf("ERROR: Only version 1 is supported, received version %d", version) + } + + gzipReader, err := gzip.NewReader(b) + if err != nil { + return fmt.Errorf("ERROR: Creating Gzip reader: %q", err) + } + + err = msgp.Decode(bufio.NewReader(gzipReader), a) + if err != nil { + return fmt.Errorf("ERROR: Unmarshaling Raw: %q", err) + } + gzipReader.Close() + + return nil +} diff --git a/mdata/importer/archive_request_gen.go b/mdata/importer/archive_request_gen.go new file mode 100644 index 0000000000..38e76993e5 --- /dev/null +++ b/mdata/importer/archive_request_gen.go @@ -0,0 +1,182 @@ +package importer + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *ArchiveRequest) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "MetricData": + err = z.MetricData.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "MetricData") + return + } + case "ChunkWriteRequests": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "ChunkWriteRequests") + return + } + if cap(z.ChunkWriteRequests) >= int(zb0002) { + z.ChunkWriteRequests = (z.ChunkWriteRequests)[:zb0002] + } else { + z.ChunkWriteRequests = make([]ChunkWriteRequest, zb0002) + } + for za0001 := range z.ChunkWriteRequests { + err = z.ChunkWriteRequests[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "ChunkWriteRequests", za0001) + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *ArchiveRequest) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "MetricData" + err = en.Append(0x82, 0xaa, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x61, 0x74, 0x61) + if err != nil { + return + } + err = z.MetricData.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "MetricData") + return + } + // write "ChunkWriteRequests" + err = en.Append(0xb2, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.ChunkWriteRequests))) + if err != nil { + err = msgp.WrapError(err, "ChunkWriteRequests") + return + } + for za0001 := range z.ChunkWriteRequests { + err = z.ChunkWriteRequests[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "ChunkWriteRequests", za0001) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *ArchiveRequest) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "MetricData" + o = append(o, 0x82, 0xaa, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x61, 0x74, 0x61) + o, err = z.MetricData.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "MetricData") + return + } + // string "ChunkWriteRequests" + o = append(o, 0xb2, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.ChunkWriteRequests))) + for za0001 := range z.ChunkWriteRequests { + o, err = z.ChunkWriteRequests[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "ChunkWriteRequests", za0001) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ArchiveRequest) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "MetricData": + bts, err = z.MetricData.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "MetricData") + return + } + case "ChunkWriteRequests": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ChunkWriteRequests") + return + } + if cap(z.ChunkWriteRequests) >= int(zb0002) { + z.ChunkWriteRequests = (z.ChunkWriteRequests)[:zb0002] + } else { + z.ChunkWriteRequests = make([]ChunkWriteRequest, zb0002) + } + for za0001 := range z.ChunkWriteRequests { + bts, err = z.ChunkWriteRequests[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "ChunkWriteRequests", za0001) + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *ArchiveRequest) Msgsize() (s int) { + s = 1 + 11 + z.MetricData.Msgsize() + 19 + msgp.ArrayHeaderSize + for za0001 := range z.ChunkWriteRequests { + s += z.ChunkWriteRequests[za0001].Msgsize() + } + return +} diff --git a/mdata/importer/archive_request_gen_test.go b/mdata/importer/archive_request_gen_test.go new file mode 100644 index 0000000000..722715068d --- /dev/null +++ b/mdata/importer/archive_request_gen_test.go @@ -0,0 +1,123 @@ +package importer + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalArchiveRequest(t *testing.T) { + v := ArchiveRequest{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgArchiveRequest(b *testing.B) { + v := ArchiveRequest{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgArchiveRequest(b *testing.B) { + v := ArchiveRequest{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalArchiveRequest(b *testing.B) { + v := ArchiveRequest{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeArchiveRequest(t *testing.T) { + v := ArchiveRequest{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Logf("WARNING: Msgsize() for %v is inaccurate", v) + } + + vn := ArchiveRequest{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeArchiveRequest(b *testing.B) { + v := ArchiveRequest{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeArchiveRequest(b *testing.B) { + v := ArchiveRequest{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/mdata/importer/chunk_encoder.go b/mdata/importer/chunk_encoder.go index 94f42b9928..a04d5f6e93 100644 --- a/mdata/importer/chunk_encoder.go +++ b/mdata/importer/chunk_encoder.go @@ -7,7 +7,7 @@ import ( "github.com/kisielk/whisper-go/whisper" ) -func EncodeChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32, writeUnfinishedChunks bool) []*chunk.Chunk { +func encodeChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32, writeUnfinishedChunks bool) []*chunk.Chunk { var point whisper.Point var t0, prevT0 uint32 var c *chunk.Chunk diff --git a/mdata/importer/chunk_encoder_test.go b/mdata/importer/chunk_encoder_test.go index b964376c59..fba8b57cea 100644 --- a/mdata/importer/chunk_encoder_test.go +++ b/mdata/importer/chunk_encoder_test.go @@ -11,7 +11,7 @@ func TestEncodedChunksFromPointsWithUnfinished(t *testing.T) { points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 }) expectedCount := 8640 // count including unfinished chunks - chunks := EncodeChunksFromPoints(points, 10, 21600, true) + chunks := encodeChunksFromPoints(points, 10, 21600, true) if len(chunks) != 5 { t.Fatalf("Expected to get 5 chunks, but got %d", len(chunks)) @@ -48,7 +48,7 @@ func TestEncodedChunksFromPointsWithoutUnfinished(t *testing.T) { points := generatePoints(25200, 10, 10, 0, 8640, func(i float64) float64 { return i + 1 }) expectedCount := 8640 - (2520 % 2160) // count minus what would end up in an unfinished chunk - chunks := EncodeChunksFromPoints(points, 10, 21600, false) + chunks := encodeChunksFromPoints(points, 10, 21600, false) if len(chunks) != 4 { t.Fatalf("Expected to get 4 chunks, but got %d", len(chunks)) diff --git a/mdata/importer/converter.go b/mdata/importer/converter.go index ee254a7e2d..34b89a0e2b 100644 --- a/mdata/importer/converter.go +++ b/mdata/importer/converter.go @@ -9,7 +9,7 @@ import ( "github.com/raintank/schema" ) -type Converter struct { +type converter struct { archives []whisper.ArchiveInfo points map[int][]whisper.Point method schema.Method @@ -19,12 +19,12 @@ type Converter struct { const fakeAvg schema.Method = 255 -func NewConverter(arch []whisper.ArchiveInfo, points map[int][]whisper.Point, method schema.Method, from, until uint32) *Converter { - return &Converter{archives: arch, points: points, method: method, from: from, until: until} +func newConverter(arch []whisper.ArchiveInfo, points map[int][]whisper.Point, method schema.Method, from, until uint32) *converter { + return &converter{archives: arch, points: points, method: method, from: from, until: until} } // generates points according to specified parameters by finding and using the best archives as input -func (c *Converter) GetPoints(retIdx int, spp, nop uint32) map[schema.Method][]whisper.Point { +func (c *converter) getPoints(retIdx int, spp, nop uint32) map[schema.Method][]whisper.Point { res := make(map[schema.Method][]whisper.Point) if len(c.points) == 0 { @@ -111,7 +111,7 @@ func (c *Converter) GetPoints(retIdx int, spp, nop uint32) map[schema.Method][]w return res } -func (c *Converter) findSmallestLargestArchive(spp, nop uint32) (int, int) { +func (c *converter) findSmallestLargestArchive(spp, nop uint32) (int, int) { // find smallest archive that still contains enough data to satisfy requested range largestArchiveIdx := len(c.archives) - 1 for i := largestArchiveIdx; i >= 0; i-- { @@ -282,7 +282,7 @@ func sortPoints(points pointSorter) pointSorter { return points } -func ConvertWhisperMethod(whisperMethod whisper.AggregationMethod) (schema.Method, error) { +func convertWhisperMethod(whisperMethod whisper.AggregationMethod) (schema.Method, error) { switch whisperMethod { case whisper.AggregationAverage: return schema.Avg, nil diff --git a/mdata/importer/converter_test.go b/mdata/importer/converter_test.go index 1ffb3d274e..988cc9fd41 100644 --- a/mdata/importer/converter_test.go +++ b/mdata/importer/converter_test.go @@ -704,7 +704,7 @@ func verifyPointMaps(t *testing.T, points map[schema.Method][]whisper.Point, exp } func TestPointsConversionSum1(t *testing.T) { - c := conversion{ + c := converter{ archives: []whisper.ArchiveInfo{ {SecondsPerPoint: 1, Points: 2}, {SecondsPerPoint: 2, Points: 2}, @@ -755,9 +755,9 @@ func TestPointsConversionSum1(t *testing.T) { }, } - points1 := c.GetPoints(0, 1, 8) - points2 := c.GetPoints(0, 2, 4) - points3 := c.GetPoints(0, 4, 2) + points1 := c.getPoints(0, 1, 8) + points2 := c.getPoints(0, 2, 4) + points3 := c.getPoints(0, 4, 2) verifyPointMaps(t, points1, expectedPoints1) verifyPointMaps(t, points2, expectedPoints2) @@ -765,7 +765,7 @@ func TestPointsConversionSum1(t *testing.T) { } func TestPointsConversionLast1(t *testing.T) { - c := conversion{ + c := converter{ archives: []whisper.ArchiveInfo{ {SecondsPerPoint: 1, Points: 2}, {SecondsPerPoint: 2, Points: 2}, @@ -816,9 +816,9 @@ func TestPointsConversionLast1(t *testing.T) { }, } - points1 := c.GetPoints(0, 1, 8) - points2 := c.GetPoints(0, 2, 4) - points3 := c.GetPoints(0, 4, 2) + points1 := c.getPoints(0, 1, 8) + points2 := c.getPoints(0, 2, 4) + points3 := c.getPoints(0, 4, 2) verifyPointMaps(t, points1, expectedPoints1) verifyPointMaps(t, points2, expectedPoints2) @@ -826,7 +826,7 @@ func TestPointsConversionLast1(t *testing.T) { } func TestPointsConversionSum2(t *testing.T) { - c := conversion{ + c := converter{ archives: []whisper.ArchiveInfo{ {SecondsPerPoint: 1, Points: 8}, {SecondsPerPoint: 2, Points: 8}, @@ -939,9 +939,9 @@ func TestPointsConversionSum2(t *testing.T) { }, } - points1 := c.GetPoints(0, 1, 32) - points2 := c.GetPoints(1, 2, 16) - points3 := c.GetPoints(2, 4, 8) + points1 := c.getPoints(0, 1, 32) + points2 := c.getPoints(1, 2, 16) + points3 := c.getPoints(2, 4, 8) verifyPointMaps(t, points1, expectedPoints1) verifyPointMaps(t, points2, expectedPoints2) @@ -949,7 +949,7 @@ func TestPointsConversionSum2(t *testing.T) { } func TestPointsConversionAvg1(t *testing.T) { - c := conversion{ + c := converter{ archives: []whisper.ArchiveInfo{ {SecondsPerPoint: 1, Points: 2}, {SecondsPerPoint: 2, Points: 2}, @@ -1088,18 +1088,18 @@ func TestPointsConversionAvg1(t *testing.T) { }, } - points1_0 := c.GetPoints(0, 1, 8) - points2_0 := c.GetPoints(0, 2, 4) - points3_0 := c.GetPoints(0, 4, 2) + points1_0 := c.getPoints(0, 1, 8) + points2_0 := c.getPoints(0, 2, 4) + points3_0 := c.getPoints(0, 4, 2) - points1_1 := c.GetPoints(1, 1, 8) - points2_1 := c.GetPoints(1, 2, 4) - points3_1 := c.GetPoints(1, 4, 100) + points1_1 := c.getPoints(1, 1, 8) + points2_1 := c.getPoints(1, 2, 4) + points3_1 := c.getPoints(1, 4, 100) c.until = 1503407723 - points1_2 := c.GetPoints(1, 1, 8) - points2_2 := c.GetPoints(1, 2, 100) - points3_2 := c.GetPoints(1, 4, 8) + points1_2 := c.getPoints(1, 1, 8) + points2_2 := c.getPoints(1, 2, 100) + points3_2 := c.getPoints(1, 4, 8) c.until = math.MaxUint32 verifyPointMaps(t, points1_0, expectedPoints1_0) @@ -1116,7 +1116,7 @@ func TestPointsConversionAvg1(t *testing.T) { } func TestPointsConversionAvg2(t *testing.T) { - c := conversion{ + c := converter{ archives: []whisper.ArchiveInfo{ {SecondsPerPoint: 1, Points: 3}, {SecondsPerPoint: 3, Points: 3}, @@ -1297,13 +1297,13 @@ func TestPointsConversionAvg2(t *testing.T) { }, } - points1_0 := c.GetPoints(0, 1, 27) - points2_0 := c.GetPoints(0, 3, 100) - points3_0 := c.GetPoints(0, 9, 100) + points1_0 := c.getPoints(0, 1, 27) + points2_0 := c.getPoints(0, 3, 100) + points3_0 := c.getPoints(0, 9, 100) - points1_1 := c.GetPoints(1, 1, 27) - points2_1 := c.GetPoints(1, 3, 9) - points3_1 := c.GetPoints(1, 9, 3) + points1_1 := c.getPoints(1, 1, 27) + points2_1 := c.getPoints(1, 3, 9) + points3_1 := c.getPoints(1, 9, 3) verifyPointMaps(t, points1_0, expectedPoints1_0) verifyPointMaps(t, points2_0, expectedPoints2_0) diff --git a/mdata/importer/cwr.go b/mdata/importer/cwr.go index 36f148fd41..5ae244da9e 100644 --- a/mdata/importer/cwr.go +++ b/mdata/importer/cwr.go @@ -1,16 +1,10 @@ package importer import ( - "bufio" - "bytes" - "compress/gzip" - "fmt" - "io" "time" "github.com/grafana/metrictank/mdata" "github.com/raintank/schema" - "github.com/tinylib/msgp/msgp" ) //go:generate msgp @@ -29,55 +23,3 @@ func NewChunkWriteRequest(archive schema.Archive, ttl, t0 uint32, data []byte, t func (c *ChunkWriteRequest) GetChunkWriteRequest(callback mdata.ChunkSaveCallback, key schema.MKey) mdata.ChunkWriteRequest { return mdata.ChunkWriteRequest{c.ChunkWriteRequestPayload, callback, schema.AMKey{MKey: key, Archive: c.Archive}} } - -// ArchiveRequest is a complete representation of a Metric together with some -// chunk write requests containing data that shall be written into this metric -type ArchiveRequest struct { - MetricData schema.MetricData - ChunkWriteRequests []ChunkWriteRequest -} - -func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) { - var buf bytes.Buffer - - buf.WriteByte(byte(uint8(1))) - - g := gzip.NewWriter(&buf) - err := msgp.Encode(g, a) - if err != nil { - return &buf, fmt.Errorf("ERROR: Encoding MGSP data: %q", err) - } - - err = g.Close() - if err != nil { - return &buf, fmt.Errorf("ERROR: Compressing MSGP data: %q", err) - } - - return &buf, nil -} - -func (a *ArchiveRequest) UnmarshalCompressed(b io.Reader) error { - versionBuf := make([]byte, 1) - readBytes, err := b.Read(versionBuf) - if err != nil || readBytes != 1 { - return fmt.Errorf("ERROR: Failed to read one byte: %s", err) - } - - version := uint8(versionBuf[0]) - if version != 1 { - return fmt.Errorf("ERROR: Only version 1 is supported, received version %d", version) - } - - gzipReader, err := gzip.NewReader(b) - if err != nil { - return fmt.Errorf("ERROR: Creating Gzip reader: %q", err) - } - - err = msgp.Decode(bufio.NewReader(gzipReader), a) - if err != nil { - return fmt.Errorf("ERROR: Unmarshaling Raw: %q", err) - } - gzipReader.Close() - - return nil -} diff --git a/mdata/importer/cwr_gen.go b/mdata/importer/cwr_gen.go index 2406a32d52..9d257cd591 100644 --- a/mdata/importer/cwr_gen.go +++ b/mdata/importer/cwr_gen.go @@ -6,265 +6,6 @@ import ( "github.com/tinylib/msgp/msgp" ) -// DecodeMsg implements msgp.Decodable -func (z *ArchiveRequest) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "MetricData": - err = z.MetricData.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "MetricData") - return - } - case "ChunkWriteRequests": - var zb0002 uint32 - zb0002, err = dc.ReadArrayHeader() - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests") - return - } - if cap(z.ChunkWriteRequests) >= int(zb0002) { - z.ChunkWriteRequests = (z.ChunkWriteRequests)[:zb0002] - } else { - z.ChunkWriteRequests = make([]ChunkWriteRequest, zb0002) - } - for za0001 := range z.ChunkWriteRequests { - var zb0003 uint32 - zb0003, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001) - return - } - for zb0003 > 0 { - zb0003-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001) - return - } - switch msgp.UnsafeString(field) { - case "ChunkWriteRequestPayload": - err = z.ChunkWriteRequests[za0001].ChunkWriteRequestPayload.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "ChunkWriteRequestPayload") - return - } - case "Archive": - err = z.ChunkWriteRequests[za0001].Archive.DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "Archive") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001) - return - } - } - } - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *ArchiveRequest) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 - // write "MetricData" - err = en.Append(0x82, 0xaa, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x61, 0x74, 0x61) - if err != nil { - return - } - err = z.MetricData.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "MetricData") - return - } - // write "ChunkWriteRequests" - err = en.Append(0xb2, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73) - if err != nil { - return - } - err = en.WriteArrayHeader(uint32(len(z.ChunkWriteRequests))) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests") - return - } - for za0001 := range z.ChunkWriteRequests { - // map header, size 2 - // write "ChunkWriteRequestPayload" - err = en.Append(0x82, 0xb8, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64) - if err != nil { - return - } - err = z.ChunkWriteRequests[za0001].ChunkWriteRequestPayload.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "ChunkWriteRequestPayload") - return - } - // write "Archive" - err = en.Append(0xa7, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65) - if err != nil { - return - } - err = z.ChunkWriteRequests[za0001].Archive.EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "Archive") - return - } - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *ArchiveRequest) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 2 - // string "MetricData" - o = append(o, 0x82, 0xaa, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x61, 0x74, 0x61) - o, err = z.MetricData.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "MetricData") - return - } - // string "ChunkWriteRequests" - o = append(o, 0xb2, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.ChunkWriteRequests))) - for za0001 := range z.ChunkWriteRequests { - // map header, size 2 - // string "ChunkWriteRequestPayload" - o = append(o, 0x82, 0xb8, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x57, 0x72, 0x69, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64) - o, err = z.ChunkWriteRequests[za0001].ChunkWriteRequestPayload.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "ChunkWriteRequestPayload") - return - } - // string "Archive" - o = append(o, 0xa7, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65) - o, err = z.ChunkWriteRequests[za0001].Archive.MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "Archive") - return - } - } - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *ArchiveRequest) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "MetricData": - bts, err = z.MetricData.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "MetricData") - return - } - case "ChunkWriteRequests": - var zb0002 uint32 - zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests") - return - } - if cap(z.ChunkWriteRequests) >= int(zb0002) { - z.ChunkWriteRequests = (z.ChunkWriteRequests)[:zb0002] - } else { - z.ChunkWriteRequests = make([]ChunkWriteRequest, zb0002) - } - for za0001 := range z.ChunkWriteRequests { - var zb0003 uint32 - zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001) - return - } - for zb0003 > 0 { - zb0003-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001) - return - } - switch msgp.UnsafeString(field) { - case "ChunkWriteRequestPayload": - bts, err = z.ChunkWriteRequests[za0001].ChunkWriteRequestPayload.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "ChunkWriteRequestPayload") - return - } - case "Archive": - bts, err = z.ChunkWriteRequests[za0001].Archive.UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001, "Archive") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err, "ChunkWriteRequests", za0001) - return - } - } - } - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *ArchiveRequest) Msgsize() (s int) { - s = 1 + 11 + z.MetricData.Msgsize() + 19 + msgp.ArrayHeaderSize - for za0001 := range z.ChunkWriteRequests { - s += 1 + 25 + z.ChunkWriteRequests[za0001].ChunkWriteRequestPayload.Msgsize() + 8 + z.ChunkWriteRequests[za0001].Archive.Msgsize() - } - return -} - // DecodeMsg implements msgp.Decodable func (z *ChunkWriteRequest) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte diff --git a/mdata/importer/cwr_gen_test.go b/mdata/importer/cwr_gen_test.go index 168ed0dc91..36a1846fa2 100644 --- a/mdata/importer/cwr_gen_test.go +++ b/mdata/importer/cwr_gen_test.go @@ -9,119 +9,6 @@ import ( "github.com/tinylib/msgp/msgp" ) -func TestMarshalUnmarshalArchiveRequest(t *testing.T) { - v := ArchiveRequest{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgArchiveRequest(b *testing.B) { - v := ArchiveRequest{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgArchiveRequest(b *testing.B) { - v := ArchiveRequest{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshalArchiveRequest(b *testing.B) { - v := ArchiveRequest{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodeArchiveRequest(t *testing.T) { - v := ArchiveRequest{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Logf("WARNING: Msgsize() for %v is inaccurate", v) - } - - vn := ArchiveRequest{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodeArchiveRequest(b *testing.B) { - v := ArchiveRequest{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodeArchiveRequest(b *testing.B) { - v := ArchiveRequest{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} - func TestMarshalUnmarshalChunkWriteRequest(t *testing.T) { v := ChunkWriteRequest{} bts, err := v.MarshalMsg(nil)