Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Make the importer utilities rely on TSDB-GW for authentication and org-association #1335

Merged
merged 6 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 9 additions & 163 deletions cmd/mt-whisper-importer-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,22 @@ import (
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/grafana/metrictank/conf"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/chunk"
"github.com/grafana/metrictank/mdata/importer"
"github.com/kisielk/whisper-go/whisper"
"github.com/raintank/schema"
log "github.com/sirupsen/logrus"
)

var (
httpEndpoint = flag.String(
"http-endpoint",
"http://127.0.0.1:8080/chunks",
"http://127.0.0.1:8080/metrics/import",
"The http endpoint to send the data to",
)
namePrefix = flag.String(
Expand All @@ -48,11 +45,6 @@ var (
false,
"Defines if chunks that have not completed their chunk span should be written",
)
orgId = flag.Int(
"orgid",
1,
"Organization ID the data belongs to ",
)
insecureSSL = flag.Bool(
"insecure-ssl",
false,
Expand All @@ -78,15 +70,15 @@ var (
"",
"A regex pattern to be applied to all metric names, only matching ones will be imported",
)
importUpTo = flag.Uint(
"import-up-to",
importUntil = flag.Uint(
"import-until",
math.MaxUint32,
"Only import up to the specified timestamp",
"Only import up to, but not including, the specified timestamp",
)
importAfter = flag.Uint(
"import-after",
importFrom = flag.Uint(
"import-from",
0,
"Only import after the specified timestamp",
"Only import starting from the specified timestamp",
)
positionFile = flag.String(
"position-file",
Expand Down Expand Up @@ -167,7 +159,7 @@ func processFromChan(pos *posTracker, files chan string, wg *sync.WaitGroup) {

name := getMetricName(file)
log.Debugf("Processing file %s (%s)", file, name)
data, err := getMetric(w, file, name)
data, err := importer.NewArchiveRequest(w, schemas, file, name, uint32(*importFrom), uint32(*importUntil), *writeUnfinishedChunks)
if err != nil {
log.Errorf("Failed to get metric: %q", err.Error())
continue
Expand Down Expand Up @@ -246,152 +238,6 @@ func getMetricName(file string) string {
return *namePrefix + strings.Replace(strings.TrimSuffix(file, ".wsp"), "/", ".", -1)
}

// pointSorter sorts points by timestamp
type pointSorter []whisper.Point

func (a pointSorter) Len() int { return len(a) }
func (a pointSorter) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a pointSorter) Less(i, j int) bool { return a[i].Timestamp < a[j].Timestamp }

// the whisper archives are organized like a ringbuffer. since we need to
// insert the points into the chunks in order we first need to sort them
func sortPoints(points pointSorter) pointSorter {
sort.Sort(points)
return points
}

func convertWhisperMethod(whisperMethod whisper.AggregationMethod) (schema.Method, error) {
switch whisperMethod {
case whisper.AggregationAverage:
return schema.Avg, nil
case whisper.AggregationSum:
return schema.Sum, nil
case whisper.AggregationLast:
return schema.Lst, nil
case whisper.AggregationMax:
return schema.Max, nil
case whisper.AggregationMin:
return schema.Min, nil
default:
return 0, fmt.Errorf("Unknown whisper method: %d", whisperMethod)
}
}

func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, error) {
if len(w.Header.Archives) == 0 {
return nil, fmt.Errorf("Whisper file contains no archives: %q", file)
}

method, err := convertWhisperMethod(w.Header.Metadata.AggregationMethod)
if err != nil {
return nil, err
}

points := make(map[int][]whisper.Point)
for i := range w.Header.Archives {
p, err := w.DumpArchive(i)
if err != nil {
return nil, fmt.Errorf("Failed to dump archive %d from whisper file %s", i, file)
}
points[i] = p
}

res := &mdata.ArchiveRequest{
MetricData: schema.MetricData{
Name: name,
Value: 0,
Interval: int(w.Header.Archives[0].SecondsPerPoint),
Unit: "unknown",
Time: 0,
Mtype: "gauge",
Tags: []string{},
OrgId: *orgId,
},
}
res.MetricData.SetId()
mkey, err := schema.MKeyFromString(res.MetricData.Id)
if err != nil {
panic(err)
}

_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
conversion := newConversion(w.Header.Archives, points, method)
for retIdx, retention := range selectedSchema.Retentions {
convertedPoints := conversion.getPoints(retIdx, uint32(retention.SecondsPerPoint), uint32(retention.NumberOfPoints))
for m, p := range convertedPoints {
if len(p) == 0 {
continue
}

var amkey schema.AMKey
if retIdx == 0 {
amkey = schema.AMKey{MKey: mkey}
} else {
amkey = schema.GetAMKey(mkey, m, retention.ChunkSpan)
}

encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan)
for _, chunk := range encodedChunks {
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequest(
nil,
amkey,
uint32(retention.MaxRetention()),
chunk.Series.T0,
chunk.Encode(retention.ChunkSpan),
time.Now(),
))
}

if res.MetricData.Time < int64(p[len(p)-1].Timestamp) {
res.MetricData.Time = int64(p[len(p)-1].Timestamp)
}
}
}

return res, nil
}

func encodedChunksFromPoints(points []whisper.Point, intervalIn, chunkSpan uint32) []*chunk.Chunk {
var point whisper.Point
var t0, prevT0 uint32
var c *chunk.Chunk
var encodedChunks []*chunk.Chunk

for _, point = range points {
// this shouldn't happen, but if it would we better catch it here because Metrictank wouldn't handle it well:
// https://github.com/grafana/metrictank/blob/f1868cccfb92fc82cd853914af958f6d187c5f74/mdata/aggmetric.go#L378
if point.Timestamp == 0 {
continue
}

t0 = point.Timestamp - (point.Timestamp % chunkSpan)
if prevT0 == 0 {
c = chunk.New(t0)
prevT0 = t0
} else if prevT0 != t0 {
c.Finish()
encodedChunks = append(encodedChunks, c)

c = chunk.New(t0)
prevT0 = t0
}

err := c.Push(point.Timestamp, point.Value)
if err != nil {
panic(fmt.Sprintf("ERROR: Failed to push value into chunk at t0 %d: %q", t0, err))
}
}

// if the last written point was also the last one of the current chunk,
// or if writeUnfinishedChunks is on, we close the chunk and push it
if point.Timestamp == t0+chunkSpan-intervalIn || *writeUnfinishedChunks {
c.Finish()
encodedChunks = append(encodedChunks, c)
}

return encodedChunks
}

// scan a directory and feed the list of whisper files relative to base into the given channel
func getFileListIntoChan(pos *posTracker, fileChan chan string) {
filepath.Walk(
Expand Down
39 changes: 31 additions & 8 deletions cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"errors"
"flag"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"

Expand All @@ -20,17 +22,18 @@ import (
"github.com/grafana/metrictank/idx/cassandra"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/importer"
bigTableStore "github.com/grafana/metrictank/store/bigtable"
cassandraStore "github.com/grafana/metrictank/store/cassandra"
)

var (
confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")
exitOnError = flag.Bool("exit-on-error", false, "Exit with a message when there's an error")
httpEndpoint = flag.String("http-endpoint", "127.0.0.1:8080", "The http endpoint to listen on")
httpEndpoint = flag.String("http-endpoint", "0.0.0.0:8080", "The http endpoint to listen on")
ttlsStr = flag.String("ttls", "35d", "list of ttl strings used by MT separated by ','")
partitionScheme = flag.String("partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
uriPath = flag.String("uri-path", "/chunks", "the URI on which we expect chunks to get posted")
uriPath = flag.String("uri-path", "/metrics/import", "the URI on which we expect chunks to get posted")
numPartitions = flag.Int("num-partitions", 1, "Number of Partitions")
logLevel = flag.String("log-level", "info", "log level. panic|fatal|error|warning|info|debug")

Expand Down Expand Up @@ -173,8 +176,15 @@ func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) {
}

func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
data := mdata.ArchiveRequest{}
err := data.UnmarshalCompressed(req.Body)
orgId, err := getOrgId(req)
if err != nil {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(err.Error()))
return
}

data := importer.ArchiveRequest{}
err = data.UnmarshalCompressed(req.Body)
if err != nil {
throwError(w, fmt.Sprintf("Error decoding cwr stream: %q", err))
return
Expand All @@ -189,13 +199,15 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
"Received %d cwrs for metric %s. The first has Key: %s, T0: %d, TTL: %d. The last has Key: %s, T0: %d, TTL: %d",
len(data.ChunkWriteRequests),
data.MetricData.Name,
data.ChunkWriteRequests[0].Key.String(),
data.ChunkWriteRequests[0].Archive.String(),
data.ChunkWriteRequests[0].T0,
data.ChunkWriteRequests[0].TTL,
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Key.String(),
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Archive.String(),
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].T0,
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].TTL)

data.MetricData.OrgId = orgId
data.MetricData.SetId()
partition, err := s.partitioner.Partition(&data.MetricData, int32(*numPartitions))
if err != nil {
throwError(w, fmt.Sprintf("Error partitioning: %q", err))
Expand All @@ -210,7 +222,18 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {

s.index.AddOrUpdate(mkey, &data.MetricData, partition)
for _, cwr := range data.ChunkWriteRequests {
cwr := cwr // important because we pass by reference and this var will get overwritten in the next loop
s.store.Add(&cwr)
cwrWithOrg := cwr.GetChunkWriteRequest(nil, mkey)
s.store.Add(&cwrWithOrg)
}
}

func getOrgId(req *http.Request) (int, error) {
if orgIdStr := req.Header.Get("X-Org-Id"); len(orgIdStr) > 0 {
if orgId, err := strconv.Atoi(orgIdStr); err == nil {
return orgId, nil
} else {
return 0, fmt.Errorf("Invalid value in X-Org-Id header (%s): %s", orgIdStr, err)
}
}
return 0, errors.New("Missing X-Org-Id header")
}
16 changes: 7 additions & 9 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -673,19 +673,17 @@ Usage of ./mt-whisper-importer-reader:
-http-auth string
The credentials used to authenticate in the format "user:password"
-http-endpoint string
The http endpoint to send the data to (default "http://127.0.0.1:8080/chunks")
-import-after uint
Only import after the specified timestamp
-import-up-to uint
Only import up to the specified timestamp (default 4294967295)
The http endpoint to send the data to (default "http://127.0.0.1:8080/metrics/import")
-import-from uint
Only import starting from the specified timestamp
-import-until uint
Only import up to, but not including, the specified timestamp (default 4294967295)
-insecure-ssl
Disables ssl certificate verification
-name-filter string
A regex pattern to be applied to all metric names, only matching ones will be imported
-name-prefix string
Prefix to prepend before every metric name, should include the '.' if necessary
-orgid int
Organization ID the data belongs to (default 1)
-position-file string
file to store position and load position from
-threads int
Expand All @@ -708,7 +706,7 @@ Usage of ./mt-whisper-importer-writer:
-exit-on-error
Exit with a message when there's an error
-http-endpoint string
The http endpoint to listen on (default "127.0.0.1:8080")
The http endpoint to listen on (default "0.0.0.0:8080")
-log-level string
log level. panic|fatal|error|warning|info|debug (default "info")
-num-partitions int
Expand All @@ -718,6 +716,6 @@ Usage of ./mt-whisper-importer-writer:
-ttls string
list of ttl strings used by MT separated by ',' (default "35d")
-uri-path string
the URI on which we expect chunks to get posted (default "/chunks")
the URI on which we expect chunks to get posted (default "/metrics/import")
```

Loading