Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Update Graphite consolidateBy function to match original implementation for maxDataPoints and "last" aggregation #3602

Merged
merged 5 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 66 additions & 4 deletions scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ docker-compose -f ${COMPOSE_FILE} up -d dbnode01
docker-compose -f ${COMPOSE_FILE} up -d coordinator01

# Think of this as a defer func() in golang
METRIC_EMIT_PID="-1"
function defer {
docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes
if [ "$METRIC_EMIT_PID" != "-1" ]; then
echo "Kill metric emit process"
kill $METRIC_EMIT_PID
fi
}
trap defer EXIT

Expand All @@ -26,15 +31,51 @@ function read_carbon {
expected_val=$2
end=$(date +%s)
start=$(($end-1000))
RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/render?target=$target&from=$start&until=$end")
test "$(echo "$RESPONSE" | jq ".[0].datapoints | .[][0] | select(. != null)" | jq -s last)" = "$expected_val"
if [ "$3" != "" ]; then
start=$3
fi
if [ "$4" != "" ]; then
end=$4
fi
params=$5
if [ "$params" != "" ]; then
params="&${params}"
fi

RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/render?target=${target}&from=${start}&until=${end}${params}")

expr="last_non_null"
if [ "$6" != "" ]; then
expr="$6"
fi

if [ "$expr" = "last_non_null" ]; then
test "$(echo "$RESPONSE" | jq ".[0].datapoints | .[][0] | select(. != null)" | jq -s last)" = "$expected_val"
return $?
fi

if [ "$expr" = "max_non_null" ]; then
test "$(echo "$RESPONSE" | jq "[ .[0].datapoints | .[] | select(.[0] != null) | .[0] ] | max")" = "$expected_val"
return $?
fi

return 1
}

function wait_carbon_values_accumulated {
target=$1
expected_val=$2
end=$(date +%s)
start=$(($end-1000))
RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/render?target=${target}&from=${start}&until=${end}")
test "$(echo "$RESPONSE" | jq "[ .[0].datapoints | .[][0] | select(. != null) ] | length")" -gt "$expected_val"
return $?
}

function find_carbon {
query=$1
expected_file=$2
RESPONSE=$(curl -sSg "http://localhost:7201/api/v1/graphite/metrics/find?query=$query")
RESPONSE=$(curl -sSg "http://localhost:7201/api/v1/graphite/metrics/find?query=${query}")
ACTUAL=$(echo $RESPONSE | jq '. | sort')
EXPECTED=$(cat $EXPECTED_PATH/$expected_file | jq '. | sort')
if [ "$ACTUAL" == "$EXPECTED" ]
Expand Down Expand Up @@ -98,7 +139,28 @@ ATTEMPTS=2 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff "read_carbon 'sum(**pos1-a
ATTEMPTS=2 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff "read_carbon 'sum(**pos2-1**)' 3"
ATTEMPTS=2 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff "read_carbon 'sum(**pos2-1)' 3"

# Test basic cases
# Test consolidateBy function correctly changes behavior of downsampling
# Send metric values 42 every 5 seconds
echo "Sending unaggregated carbon metrics to m3coordinator"
bash -c 'while true; do t=$(date +%s); echo "stat.already-aggregated.foo 42 $t" | nc 0.0.0.0 7204; sleep 5; done' &

# Track PID to kill on exit
METRIC_EMIT_PID="$!"

# Wait until there's at least four values accumulated
ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff "wait_carbon_values_accumulated 'stat.already-aggregated.foo' 4"

# Now test the max datapoints behavior using max of four datapoints (4x 5s resolution = 20s)
end=$(date +%s)
start=$(($end-20))
# 1. no max datapoints set, should not adjust number of datapoints coming back
ATTEMPTS=2 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff "read_carbon 'stat.already-aggregated.foo' 42 $start $end"
# 2. max datapoints with LTTB, should be an existing value (i.e. 42)
ATTEMPTS=2 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff "read_carbon 'stat.already-aggregated.foo' 42 $start $end 'maxDataPoints=2' 'max_non_null'"
# 3. max datapoints with consolidateBy(.., aggFunction), should be resized according to function
ATTEMPTS=2 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff "read_carbon 'consolidateBy(stat.already-aggregated.foo,\"sum\")' 84 $start $end 'maxDataPoints=2' 'max_non_null'"

# Test basic find cases
t=$(date +%s)
echo "a 0 $t" | nc 0.0.0.0 7204
echo "a.bar 0 $t" | nc 0.0.0.0 7204
Expand Down
23 changes: 11 additions & 12 deletions src/query/api/v1/handler/graphite/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package graphite

import (
"fmt"
"math"
"net/http"
"sort"
"sync"
Expand Down Expand Up @@ -126,11 +125,12 @@ func (h *renderHandler) serveHTTP(
)

ctx := common.NewContext(common.ContextOptions{
Engine: h.engine,
Start: p.From,
End: p.Until,
Timeout: p.Timeout,
Limit: limit,
Engine: h.engine,
Start: p.From,
End: p.Until,
Timeout: p.Timeout,
Limit: limit,
MaxDataPoints: p.MaxDataPoints,
})

// Set the request context.
Expand Down Expand Up @@ -178,16 +178,15 @@ func (h *renderHandler) serveHTTP(
return
}

// Apply LTTB downsampling to any series that hasn't been resized
// to fit max datapoints explicitly using "consolidateBy" function.
for i, s := range targetSeries.Values {
if s.Len() <= int(p.MaxDataPoints) {
resizeMillisPerStep, needResize := s.ResizeToMaxDataPointsMillisPerStep(p.MaxDataPoints)
if !needResize {
continue
}

var (
samplingMultiplier = math.Ceil(float64(s.Len()) / float64(p.MaxDataPoints))
newMillisPerStep = int(samplingMultiplier * float64(s.MillisPerStep()))
)
targetSeries.Values[i] = ts.LTTB(s, s.StartTime(), s.EndTime(), newMillisPerStep)
targetSeries.Values[i] = ts.LTTB(s, s.StartTime(), s.EndTime(), resizeMillisPerStep)
}

mu.Lock()
Expand Down
15 changes: 10 additions & 5 deletions src/query/graphite/common/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type contextBase struct {
// Source is the query source.
Source []byte

// MaxDataPoints is the max datapoints for the query.
MaxDataPoints int64

parent *Context
reqCtx ctx.Context
storageContext context.Context
Expand All @@ -73,11 +76,12 @@ type Context struct {

// ContextOptions provides the options to create the context with
type ContextOptions struct {
Start time.Time
End time.Time
Engine QueryEngine
Timeout time.Duration
Limit int
Start time.Time
End time.Time
Engine QueryEngine
Timeout time.Duration
Limit int
MaxDataPoints int64
}

// TimeRangeAdjustment is an applied time range adjustment.
Expand All @@ -100,6 +104,7 @@ func NewContext(options ContextOptions) *Context {
storageContext: context.New(),
Timeout: options.Timeout,
Limit: options.Limit,
MaxDataPoints: options.MaxDataPoints,
},
}
}
Expand Down
22 changes: 18 additions & 4 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2640,7 +2640,11 @@ func cactiStyle(_ *common.Context, seriesList singlePathSpec) (ts.SeriesList, er
// the number of data points to be graphed, m3 consolidates the values to
// to prevent line overlap. The consolidateBy() function changes the consolidation
// function from the default of "average" to one of "sum", "max", or "min".
func consolidateBy(_ *common.Context, seriesList singlePathSpec, consolidationApproach string) (ts.SeriesList, error) {
func consolidateBy(
ctx *common.Context,
seriesList singlePathSpec,
consolidationApproach string,
) (ts.SeriesList, error) {
ca := ts.ConsolidationApproach(consolidationApproach)
cf, ok := ca.SafeFunc()
if !ok {
Expand All @@ -2651,9 +2655,19 @@ func consolidateBy(_ *common.Context, seriesList singlePathSpec, consolidationAp
results := make([]*ts.Series, 0, len(seriesList.Values))
for _, series := range seriesList.Values {
newName := fmt.Sprintf("consolidateBy(%s,%q)", series.Name(), consolidationApproach)
renamed := series.RenamedTo(newName)
renamed.SetConsolidationFunc(cf)
results = append(results, renamed)

newSeries := series.RenamedTo(newName)
newSeries.SetConsolidationFunc(cf)

// Check if needs to resized datapoints to fit max data points so that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/resized/resize

// default LTTB downsampling is not applied if downsampling needs to
// occur when series is rendered.
resizedSeries, resized := newSeries.ResizeToMaxDataPoints(ctx.MaxDataPoints, cf)
if resized {
newSeries = resizedSeries
}

results = append(results, newSeries)
}

r := ts.SeriesList(seriesList)
Expand Down
96 changes: 66 additions & 30 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4077,41 +4077,77 @@ func TestCactiStyle(t *testing.T) {
}

func TestConsolidateBy(t *testing.T) {
ctx := common.NewTestContext()
defer func() { _ = ctx.Close() }()

start := common.NewTestContext().StartTime
stepSize := 10000
input := struct {
name string
startTime time.Time
stepInMilli int
values []float64
for i, test := range []struct {
name string
fn string
startTime time.Time
stepMillis int
maxDataPoints int64
values []float64
expectedValues []float64
expectedStepMillis int
expectedErr bool
}{
"foo",
ctx.StartTime,
stepSize,
[]float64{1.0, 2.0, 3.0, 4.0, math.NaN()},
}
{
name: "foo",
fn: "min",
startTime: start,
stepMillis: stepSize,
values: []float64{1.0, 2.0, 3.0, 4.0, math.NaN()},
expectedStepMillis: stepSize,
expectedValues: []float64{1.0, 2.0, 3.0, 4.0, math.NaN()},
},
{
name: "foo",
fn: "min",
startTime: start,
stepMillis: stepSize,
maxDataPoints: 2,
values: []float64{1.0, 2.0, 3.0, 4.0, math.NaN()},
expectedStepMillis: 3 * stepSize,
expectedValues: []float64{1.0, 4.0},
},
{
name: "foo",
fn: "nonexistent",
startTime: start,
stepMillis: stepSize,
values: []float64{1.0, 2.0, 3.0, 4.0, math.NaN()},
expectedErr: true,
},
} {
input := test
t.Run(fmt.Sprintf("%d-%s", i, input.name), func(t *testing.T) {
ctx := common.NewTestContext()
ctx.MaxDataPoints = input.maxDataPoints
defer func() { _ = ctx.Close() }()

series := ts.NewSeries(
ctx,
input.name,
input.startTime,
common.NewTestSeriesValues(ctx, input.stepInMilli, input.values),
)
series := ts.NewSeries(
ctx,
input.name,
input.startTime,
common.NewTestSeriesValues(ctx, input.stepMillis, input.values),
)

results, err := consolidateBy(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, "min")
expected := common.TestSeries{Name: `consolidateBy(foo,"min")`, Data: input.values}
require.Nil(t, err)
common.CompareOutputsAndExpected(t, input.stepInMilli, input.startTime,
[]common.TestSeries{expected}, results.Values)
results, err := consolidateBy(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, input.fn)
if input.expectedErr {
require.Error(t, err)
return
}

results, err = consolidateBy(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, "nonexistent")
require.NotNil(t, err)
expected := common.TestSeries{
Name: fmt.Sprintf(`consolidateBy(%s,"%s")`, input.name, input.fn),
Data: input.expectedValues,
}
require.NoError(t, err)
common.CompareOutputsAndExpected(t, input.expectedStepMillis, input.startTime,
[]common.TestSeries{expected}, results.Values)
})
}
}

func TestPow(t *testing.T) {
Expand Down
51 changes: 48 additions & 3 deletions src/query/graphite/ts/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ func (v *resized) appender(timestamp time.Time, value float64) {

// IntersectAndResize returns a new time series with a different millisPerStep that spans the
// intersection of the underlying timeseries and the provided start and end time parameters
func (b *Series) IntersectAndResize(start, end time.Time, millisPerStep int,
stepAggregator ConsolidationFunc) (*Series, error) {
func (b *Series) IntersectAndResize(
start, end time.Time,
millisPerStep int,
stepAggregator ConsolidationFunc,
) (*Series, error) {
intersects, start, end := b.intersection(start, end)
if !intersects {
ts := NewSeries(b.ctx, b.name, start, &float64Values{
Expand All @@ -391,7 +394,14 @@ func (b *Series) IntersectAndResize(start, end time.Time, millisPerStep int,
if b.MillisPerStep() == millisPerStep {
return b.Slice(b.StepAtTime(start), b.StepAtTime(end))
}
return b.resized(start, end, millisPerStep, stepAggregator), nil
}

func (b *Series) resized(
start, end time.Time,
millisPerStep int,
stepAggregator ConsolidationFunc,
) *Series {
// TODO: This append based model completely screws pooling; need to rewrite to allow for pooling.
v := &resized{}
b.resizeStep(start, end, millisPerStep, stepAggregator, v.appender)
Expand All @@ -401,7 +411,42 @@ func (b *Series) IntersectAndResize(start, end time.Time, millisPerStep int,
numSteps: len(v.values),
})
ts.Specification = b.Specification
return ts, nil
return ts
}

// NeedsResizeToMaxDataPoints returns whether the series needs resizing to max datapoints.
func (b *Series) NeedsResizeToMaxDataPoints(maxDataPoints int64) bool {
if maxDataPoints <= 0 {
// No max datapoints specified.
return false
}
return int64(b.Len()) > maxDataPoints
}

// ResizeToMaxDataPointsMillisPerStep returns the new milliseconds per second
// required if a series needs resizing and true, or if does not need resize
// for max datapoints then it returns 0 and false.
func (b *Series) ResizeToMaxDataPointsMillisPerStep(
maxDataPoints int64,
) (int, bool) {
if !b.NeedsResizeToMaxDataPoints(maxDataPoints) {
return 0, false
}
samplingMultiplier := math.Ceil(float64(b.Len()) / float64(maxDataPoints))
return int(samplingMultiplier * float64(b.MillisPerStep())), true
}

// ResizeToMaxDataPoints resizes the series to fit max datapoints and returns
// true if a series was resized or false if it did not need to be resized.
func (b *Series) ResizeToMaxDataPoints(
maxDataPoints int64,
stepAggregator ConsolidationFunc,
) (*Series, bool) {
resizeMillisPerStep, needsResize := b.ResizeToMaxDataPointsMillisPerStep(maxDataPoints)
if !needsResize {
return nil, false
}
return b.resized(b.StartTime(), b.EndTime(), resizeMillisPerStep, stepAggregator), true
}

// A MutableSeries is a Series that allows updates
Expand Down