Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
importer transfers cwrs without org
Browse files Browse the repository at this point in the history
  • Loading branch information
replay committed Jun 3, 2019
1 parent 0031044 commit aeb7734
Show file tree
Hide file tree
Showing 12 changed files with 799 additions and 160 deletions.
25 changes: 6 additions & 19 deletions cmd/mt-whisper-importer-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
var (
httpEndpoint = flag.String(
"http-endpoint",
"http://127.0.0.1:8080/chunks",
"http://127.0.0.1:8080/metrics/import",
"The http endpoint to send the data to",
)
namePrefix = flag.String(
Expand All @@ -48,11 +48,6 @@ var (
false,
"Defines if chunks that have not completed their chunk span should be written",
)
orgId = flag.Int(
"orgid",
1,
"Organization ID the data belongs to ",
)
insecureSSL = flag.Bool(
"insecure-ssl",
false,
Expand Down Expand Up @@ -305,14 +300,9 @@ func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, er
Time: 0,
Mtype: "gauge",
Tags: []string{},
OrgId: *orgId,
},
}
res.MetricData.SetId()
mkey, err := schema.MKeyFromString(res.MetricData.Id)
if err != nil {
panic(err)
}

_, selectedSchema := schemas.Match(res.MetricData.Name, int(w.Header.Archives[0].SecondsPerPoint))
conversion := newConversion(w.Header.Archives, points, method)
Expand All @@ -323,18 +313,15 @@ func getMetric(w *whisper.Whisper, file, name string) (*mdata.ArchiveRequest, er
continue
}

var amkey schema.AMKey
if retIdx == 0 {
amkey = schema.AMKey{MKey: mkey}
} else {
amkey = schema.GetAMKey(mkey, m, retention.ChunkSpan)
var archive schema.Archive
if retIdx > 0 {
archive = schema.NewArchive(m, retention.ChunkSpan)
}

encodedChunks := encodedChunksFromPoints(p, uint32(retention.SecondsPerPoint), retention.ChunkSpan)
for _, chunk := range encodedChunks {
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequest(
nil,
amkey,
res.ChunkWriteRequests = append(res.ChunkWriteRequests, mdata.NewChunkWriteRequestWithoutOrg(
archive,
uint32(retention.MaxRetention()),
chunk.Series.T0,
chunk.Encode(retention.ChunkSpan),
Expand Down
42 changes: 33 additions & 9 deletions cmd/mt-whisper-importer-writer/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package main

import (
"errors"
"flag"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"

Expand All @@ -27,7 +29,8 @@ import (
var (
confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")
exitOnError = flag.Bool("exit-on-error", false, "Exit with a message when there's an error")
httpEndpoint = flag.String("http-endpoint", "127.0.0.1:8080", "The http endpoint to listen on")
listenAddress = flag.String("listen-address", "127.0.0.1", "The address to listen on")
listenPort = flag.Int("listen-port", 8080, "The port to listen on")
ttlsStr = flag.String("ttls", "35d", "list of ttl strings used by MT separated by ','")
partitionScheme = flag.String("partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
uriPath = flag.String("uri-path", "/chunks", "the URI on which we expect chunks to get posted")
Expand Down Expand Up @@ -137,21 +140,22 @@ func main() {

index.Init()

httpEndpoint := fmt.Sprintf("%s:%d", *listenAddress, *listenPort)
server := &Server{
partitioner: p,
index: index,
store: store,
HTTPServer: &http.Server{
Addr: *httpEndpoint,
Addr: httpEndpoint,
ReadTimeout: 10 * time.Minute,
},
}

http.HandleFunc(*uriPath, server.chunksHandler)
http.HandleFunc("/healthz", server.healthzHandler)

log.Infof("Listening on %q", *httpEndpoint)
err = http.ListenAndServe(*httpEndpoint, nil)
log.Infof("Listening on %q", httpEndpoint)
err = http.ListenAndServe(httpEndpoint, nil)
if err != nil {
panic(fmt.Sprintf("Error creating listener: %q", err))
}
Expand All @@ -173,8 +177,15 @@ func (s *Server) healthzHandler(w http.ResponseWriter, req *http.Request) {
}

func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
orgId, err := getOrgId(req)
if err != nil {
w.WriteHeader(http.StatusForbidden)
w.Write([]byte(err.Error()))
return
}

data := mdata.ArchiveRequest{}
err := data.UnmarshalCompressed(req.Body)
err = data.UnmarshalCompressed(req.Body)
if err != nil {
throwError(w, fmt.Sprintf("Error decoding cwr stream: %q", err))
return
Expand All @@ -189,13 +200,15 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {
"Received %d cwrs for metric %s. The first has Key: %s, T0: %d, TTL: %d. The last has Key: %s, T0: %d, TTL: %d",
len(data.ChunkWriteRequests),
data.MetricData.Name,
data.ChunkWriteRequests[0].Key.String(),
data.ChunkWriteRequests[0].Archive.String(),
data.ChunkWriteRequests[0].T0,
data.ChunkWriteRequests[0].TTL,
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Key.String(),
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].Archive.String(),
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].T0,
data.ChunkWriteRequests[len(data.ChunkWriteRequests)-1].TTL)

data.MetricData.OrgId = orgId
data.MetricData.SetId()
partition, err := s.partitioner.Partition(&data.MetricData, int32(*numPartitions))
if err != nil {
throwError(w, fmt.Sprintf("Error partitioning: %q", err))
Expand All @@ -210,7 +223,18 @@ func (s *Server) chunksHandler(w http.ResponseWriter, req *http.Request) {

s.index.AddOrUpdate(mkey, &data.MetricData, partition)
for _, cwr := range data.ChunkWriteRequests {
cwr := cwr // important because we pass by reference and this var will get overwritten in the next loop
s.store.Add(&cwr)
cwrWithOrg := cwr.GetChunkWriteRequest(nil, mkey)
s.store.Add(&cwrWithOrg)
}
}

func getOrgId(req *http.Request) (int, error) {
if orgIdStr := req.Header.Get("X-Org-Id"); len(orgIdStr) > 0 {
if orgId, err := strconv.Atoi(orgIdStr); err == nil {
return orgId, nil
} else {
return 0, fmt.Errorf("Invalid value in X-Org-Id header (%s): %s", orgIdStr, err)
}
}
return 0, errors.New("Missing X-Org-Id header")
}
10 changes: 5 additions & 5 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ Usage of ./mt-whisper-importer-reader:
-http-auth string
The credentials used to authenticate in the format "user:password"
-http-endpoint string
The http endpoint to send the data to (default "http://127.0.0.1:8080/chunks")
The http endpoint to send the data to (default "http://127.0.0.1:8080/metrics/import")
-import-after uint
Only import after the specified timestamp
-import-up-to uint
Expand All @@ -684,8 +684,6 @@ Usage of ./mt-whisper-importer-reader:
A regex pattern to be applied to all metric names, only matching ones will be imported
-name-prefix string
Prefix to prepend before every metric name, should include the '.' if necessary
-orgid int
Organization ID the data belongs to (default 1)
-position-file string
file to store position and load position from
-threads int
Expand All @@ -707,8 +705,10 @@ Usage of ./mt-whisper-importer-writer:
configuration file path (default "/etc/metrictank/metrictank.ini")
-exit-on-error
Exit with a message when there's an error
-http-endpoint string
The http endpoint to listen on (default "127.0.0.1:8080")
-listen-address string
The address to listen on (default "127.0.0.1")
-listen-port int
The port to listen on (default 8080)
-log-level string
log level. panic|fatal|error|warning|info|debug (default "info")
-num-partitions int
Expand Down
38 changes: 28 additions & 10 deletions mdata/cwr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,48 @@ import (
"github.com/raintank/schema"
)

//go:generate msgp
//msgp:ignore ChunkWriteRequest

type ChunkSaveCallback func()

// ChunkWriteRequest is a request to write a chunk into a store
//go:generate msgp
type ChunkWriteRequest struct {
Callback ChunkSaveCallback `msg:"-"`
Key schema.AMKey `msg:"key,extension"`
ChunkWriteRequestPayload
Callback ChunkSaveCallback
Key schema.AMKey
}

func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest {
return ChunkWriteRequest{ChunkWriteRequestPayload{ttl, t0, data, ts}, callback, key}
}

// ChunkWriteRequestWithoutOrg is used by the importer utility to send cwrs over the network
type ChunkWriteRequestWithoutOrg struct {
ChunkWriteRequestPayload
Archive schema.Archive
}

func NewChunkWriteRequestWithoutOrg(archive schema.Archive, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequestWithoutOrg {
return ChunkWriteRequestWithoutOrg{ChunkWriteRequestPayload{ttl, t0, data, ts}, archive}
}

func (c *ChunkWriteRequestWithoutOrg) GetChunkWriteRequest(callback ChunkSaveCallback, key schema.MKey) ChunkWriteRequest {
return ChunkWriteRequest{c.ChunkWriteRequestPayload, callback, schema.AMKey{MKey: key, Archive: c.Archive}}
}

type ChunkWriteRequestPayload struct {
TTL uint32
T0 uint32
Data []byte
Timestamp time.Time
}

// NewChunkWriteRequest creates a new ChunkWriteRequest
func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest {
return ChunkWriteRequest{callback, key, ttl, t0, data, ts}
}

// ArchiveRequest is a complete representation of a Metric together with some
// chunk write requests containing data that shall be written into this metric
//go:generate msgp
type ArchiveRequest struct {
MetricData schema.MetricData
ChunkWriteRequests []ChunkWriteRequest
ChunkWriteRequests []ChunkWriteRequestWithoutOrg
}

func (a *ArchiveRequest) MarshalCompressed() (*bytes.Buffer, error) {
Expand Down
Loading

0 comments on commit aeb7734

Please sign in to comment.