Skip to content

Commit

Permalink
[query] Add graphite query path (#1308)
Browse files Browse the repository at this point in the history
Adds processing for carbon queries, handling the find and query endpoints.
  • Loading branch information
arnikola authored and Rob Skillington committed Jan 28, 2019
1 parent d6fd1e5 commit 78a776c
Show file tree
Hide file tree
Showing 81 changed files with 17,730 additions and 75 deletions.
11 changes: 7 additions & 4 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ import:
subpackages:
- cmp

- package: github.com/hydrogen18/stalecucumber
version: 9b38526d4bdf8e197c31344777fc28f7f48d250d

# START_PROMETHEUS_DEPS
- package: github.com/prometheus/prometheus
version: 998dfcbac689ae832ea64ca134fcb096f61a7f62
Expand Down
2 changes: 0 additions & 2 deletions scripts/development/m3_stack/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,3 @@ ingest:
handler:
protobufEnabled: true

carbon:
enabled: true
4 changes: 2 additions & 2 deletions scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ echo "foo.bar.baz 1 `date +%s`" | nc 0.0.0.0 7204
echo "Attempting to read carbon metric back"
function read_carbon {
end=$(date +%s)
start=$(($end-300))
RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/query_range?start=$start&end=$end&step=10&query={__graphite0__='foo',__graphite1__='bar',__graphite2__='baz'}")
start=$(($end-3000))
RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/query_range?start=$start&end=$end&step=10&query={__g0__='foo',__g1__='bar',__g2__='baz'}")
echo "$RESPONSE" | jq '.data.result[0].values[][1]=="1"' | grep -q "true"
return $?
}
Expand Down
35 changes: 6 additions & 29 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/metrics/carbon"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3x/instrument"
Expand All @@ -47,11 +48,6 @@ var (
carbonSeparatorByte = byte('.')
carbonSeparatorBytes = []byte{carbonSeparatorByte}

// Number of pre-formatted key names to generate in the init() function.
numPreFormattedKeyNames = 100
// Should never be modified after init().
preFormattedKeyNames = [][]byte{}

errCannotGenerateTagsFromEmptyName = errors.New("cannot generate tags from empty name")
errIOptsMustBeSet = errors.New("carbon ingester options: instrument options must be st")
errWorkerPoolMustBeSet = errors.New("carbon ingester options: worker pool must be set")
Expand Down Expand Up @@ -197,9 +193,9 @@ type carbonIngesterMetrics struct {
// key-value pair tags such that an input like:
// foo.bar.baz
// becomes
// __graphite0__:foo
// __graphite1__:bar
// __graphite2__:baz
// __g0__:foo
// __g1__:bar
// __g2__:baz
func GenerateTagsFromName(name []byte) (models.Tags, error) {
if len(name) == 0 {
return models.Tags{}, errCannotGenerateTagsFromEmptyName
Expand All @@ -219,7 +215,7 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) {
}

tags = append(tags, models.Tag{
Name: getOrGenerateKeyName(tagNum),
Name: graphite.TagName(tagNum),
Value: name[startIdx:i],
})
startIdx = i + 1
Expand All @@ -238,29 +234,10 @@ func GenerateTagsFromName(name []byte) (models.Tags, error) {
// then the foor loop would have appended foo, bar, and baz already.
if name[len(name)-1] != carbonSeparatorByte {
tags = append(tags, models.Tag{
Name: getOrGenerateKeyName(tagNum),
Name: graphite.TagName(tagNum),
Value: name[startIdx:],
})
}

return models.Tags{Tags: tags}, nil
}

func getOrGenerateKeyName(idx int) []byte {
if idx < len(preFormattedKeyNames) {
return preFormattedKeyNames[idx]
}

return []byte(fmt.Sprintf("__graphite%d__", idx))
}

func generateKeyName(idx int) []byte {
return []byte(fmt.Sprintf("__graphite%d__", idx))
}

func init() {
for i := 0; i < numPreFormattedKeyNames; i++ {
keyName := generateKeyName(i)
preFormattedKeyNames = append(preFormattedKeyNames, keyName)
}
}
15 changes: 8 additions & 7 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"time"

"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3x/instrument"
Expand Down Expand Up @@ -110,23 +111,23 @@ func TestGenerateTagsFromName(t *testing.T) {
{
name: "foo",
expectedTags: []models.Tag{
{Name: []byte("__graphite0__"), Value: []byte("foo")},
{Name: graphite.TagName(0), Value: []byte("foo")},
},
},
{
name: "foo.bar.baz",
expectedTags: []models.Tag{
{Name: []byte("__graphite0__"), Value: []byte("foo")},
{Name: []byte("__graphite1__"), Value: []byte("bar")},
{Name: []byte("__graphite2__"), Value: []byte("baz")},
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
{Name: graphite.TagName(2), Value: []byte("baz")},
},
},
{
name: "foo.bar.baz.",
expectedTags: []models.Tag{
{Name: []byte("__graphite0__"), Value: []byte("foo")},
{Name: []byte("__graphite1__"), Value: []byte("bar")},
{Name: []byte("__graphite2__"), Value: []byte("baz")},
{Name: graphite.TagName(0), Value: []byte("foo")},
{Name: graphite.TagName(1), Value: []byte("bar")},
{Name: graphite.TagName(2), Value: []byte("baz")},
},
},
{
Expand Down
197 changes: 197 additions & 0 deletions src/query/api/v1/handler/graphite/render.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package graphite

import (
"context"
"fmt"
"math"
"net/http"
"sort"
"sync"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/native"
graphite "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/graphite/ts"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/net/http"
)

const (
// ReadURL is the url for the graphite query handler.
ReadURL = handler.RoutePrefixV1 + "/graphite/render"
)

var (
// ReadHTTPMethods is the HTTP methods used with this resource.
ReadHTTPMethods = []string{http.MethodGet, http.MethodPost}
)

// A renderHandler implements the graphite /render endpoint, including full
// support for executing functions. It only works against data in M3.
type renderHandler struct {
engine *native.Engine
}

type respError struct {
err error
code int
}

// NewRenderHandler returns a new render handler around the given storage.
func NewRenderHandler(
storage storage.Storage,
) http.Handler {
wrappedStore := graphite.NewM3WrappedStorage(storage)
return &renderHandler{
engine: native.NewEngine(wrappedStore),
}
}

func sendError(errorCh chan error, err error) {
select {
case errorCh <- err:
default:
}
}

// ServeHTTP processes the render requests.
func (h *renderHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
respErr := h.serveHTTP(w, r)
if respErr.err != nil {
xhttp.Error(w, respErr.err, respErr.code)
}
}

func (h *renderHandler) serveHTTP(
w http.ResponseWriter,
r *http.Request,
) respError {
reqCtx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
p, err := ParseRenderRequest(r)
if err != nil {
return respError{err: err, code: http.StatusBadRequest}
}

var (
results = make([]ts.SeriesList, len(p.Targets))
errorCh = make(chan error, 1)
mu sync.Mutex
)

ctx := common.NewContext(common.ContextOptions{
Engine: h.engine,
Start: p.From,
End: p.Until,
Timeout: p.Timeout,
})

// Set the request context.
ctx.SetRequestContext(reqCtx)
defer ctx.Close()

var wg sync.WaitGroup
wg.Add(len(p.Targets))
for i, target := range p.Targets {
i, target := i, target
go func() {
// Log the query that causes us to panic.
defer func() {
if err := recover(); err != nil {
panic(fmt.Errorf("panic executing query '%s': %v", target, err))
}
}()

childCtx := ctx.NewChildContext(common.NewChildContextOptions())
defer func() {
childCtx.Close()
wg.Done()
}()

exp, err := h.engine.Compile(target)
if err != nil {
sendError(errorCh, errors.NewRenamedError(err,
fmt.Errorf("invalid 'target': %s => %s", target, err)))
return
}

targetSeries, err := exp.Execute(childCtx)
if err != nil {
sendError(errorCh, errors.NewRenamedError(err,
fmt.Errorf("error: target %s returned %s", target, err)))
return
}

for i, s := range targetSeries.Values {
if s.Len() <= int(p.MaxDataPoints) {
continue
}

var (
samplingMultiplier = math.Ceil(float64(s.Len()) / float64(p.MaxDataPoints))
newMillisPerStep = int(samplingMultiplier * float64(s.MillisPerStep()))
)
targetSeries.Values[i] = ts.LTTB(s, s.StartTime(), s.EndTime(), newMillisPerStep)
}

mu.Lock()
results[i] = targetSeries
mu.Unlock()
}()
}

wg.Wait()
close(errorCh)
err = <-errorCh
if err != nil {
return respError{err: err, code: http.StatusInternalServerError}
}

// Count and sort the groups if not sorted already.
// NB(r): For certain things like stacking different targets in Grafana
// returning targets in order matters to give a deterministic order for
// the series to display when stacking. However we should only mutate
// the order if no expressions have explicitly applied their own sort.
numSeries := 0
for _, r := range results {
numSeries += r.Len()
if !r.SortApplied {
sort.Stable(ts.SeriesByName(r.Values))
}
}

series := make([]*ts.Series, 0, numSeries)
for _, r := range results {
series = append(series, r.Values...)
}

// We've always sorted the response by this point
response := ts.SeriesList{
Values: series,
SortApplied: true,
}

err = WriteRenderResponse(w, response)
return respError{err: err, code: http.StatusOK}
}
Loading

0 comments on commit 78a776c

Please sign in to comment.