Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Graphite fetches truncated to resolution #1997

Merged
merged 4 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ t=$(date +%s)
echo "foo.min.aggregate.baz 41 $t" | nc 0.0.0.0 7204
echo "foo.min.aggregate.baz 42 $t" | nc 0.0.0.0 7204
echo "Attempting to read min aggregated carbon metric"
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.min.aggregate.baz 41
ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon foo.min.aggregate.baz 41

echo "Writing out a carbon metric that should not be aggregated"
t=$(date +%s)
Expand All @@ -64,15 +64,15 @@ t=$(date +%s)
echo "foo.min.already-aggregated.baz 42 $t" | nc 0.0.0.0 7204
echo "foo.min.already-aggregated.baz 43 $t" | nc 0.0.0.0 7204
echo "Attempting to read unaggregated carbon metric"
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.min.already-aggregated.baz 43
ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon foo.min.already-aggregated.baz 43

echo "Writing out a carbon metric that should should use the default mean aggregation"
t=$(date +%s)
# Mean of 10 and 20 is 15. Same comment as the min aggregation test above.
echo "foo.min.catch-all.baz 10 $t" | nc 0.0.0.0 7204
echo "foo.min.catch-all.baz 20 $t" | nc 0.0.0.0 7204
echo "Attempting to read mean aggregated carbon metric"
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.min.catch-all.baz 15
ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon foo.min.catch-all.baz 15

# Test writing and reading IDs with colons in them.
t=$(date +%s)
Expand Down
34 changes: 24 additions & 10 deletions src/query/api/v1/handler/graphite/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func TestParseQueryNoResults(t *testing.T) {

func TestParseQueryResults(t *testing.T) {
mockStorage := mock.NewMockStorage()
start := time.Now().Add(-30 * time.Minute)
resolution := 10 * time.Second
truncateStart := time.Now().Add(-30 * time.Minute).Truncate(resolution)
start := truncateStart.Add(time.Second)
vals := ts.NewFixedStepValues(resolution, 3, 3, start)
tags := models.NewTags(0, nil)
tags = tags.AddTag(models.Tag{Name: graphite.TagName(0), Value: []byte("foo")})
Expand Down Expand Up @@ -104,10 +105,11 @@ func TestParseQueryResults(t *testing.T) {

buf, err := ioutil.ReadAll(res.Body)
require.NoError(t, err)
exTimestamp := truncateStart.Unix() + 10
expected := fmt.Sprintf(
`[{"target":"series_name","datapoints":[[3.000000,%d],`+
`[3.000000,%d],[3.000000,%d]],"step_size_ms":%d}]`,
start.Unix(), start.Unix()+10, start.Unix()+20, resolution/time.Millisecond)
`[3.000000,%d],[null,%d]],"step_size_ms":%d}]`,
exTimestamp, exTimestamp+10, exTimestamp+20, resolution/time.Millisecond)

require.Equal(t, expected, string(buf))
}
Expand Down Expand Up @@ -138,7 +140,10 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
models.QueryContextOptions{}, nil, instrument.NewOptions())

req := newGraphiteReadHTTPRequest(t)
req.URL.RawQuery = "target=foo.bar&from=" + startStr + "&until=" + endStr + "&maxDataPoints=1"
req.URL.RawQuery = fmt.Sprintf(
"target=foo.bar&from=%s&until=%s&maxDataPoints=1",
startStr, endStr,
)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

Expand All @@ -148,18 +153,23 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
buf, err := ioutil.ReadAll(res.Body)
require.NoError(t, err)

// Expected resolution should be in milliseconds and subsume all datapoints.
exStep := end.Sub(start) / time.Millisecond
expected := fmt.Sprintf(
`[{"target":"a","datapoints":[[4.000000,%d]],"step_size_ms":%d}]`,
start.Unix(), end.Sub(start)/time.Millisecond)
start.Unix(), exStep)

require.Equal(t, expected, string(buf))
}

func TestParseQueryResultsMultiTarget(t *testing.T) {
mockStorage := mock.NewMockStorage()
minsAgo := 12
start := time.Now().Add(-1 * time.Duration(minsAgo) * time.Minute)
resolution := 10 * time.Second
start := time.Now().
Add(-1 * time.Duration(minsAgo) * time.Minute).
Truncate(resolution)

vals := ts.NewFixedStepValues(resolution, 3, 3, start)
seriesList := ts.SeriesList{
ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)),
Expand All @@ -175,8 +185,10 @@ func TestParseQueryResultsMultiTarget(t *testing.T) {
models.QueryContextOptions{}, nil, instrument.NewOptions())

req := newGraphiteReadHTTPRequest(t)
req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=baz.qux&from=%d&until=%d",
start.Unix(), start.Unix()+30)
req.URL.RawQuery = fmt.Sprintf(
"target=foo.bar&target=baz.qux&from=%d&until=%d",
start.Unix(), start.Unix()+30,
)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)

Expand Down Expand Up @@ -228,8 +240,10 @@ func TestParseQueryResultsMultiTargetWithLimits(t *testing.T) {
models.QueryContextOptions{}, nil, instrument.NewOptions())

req := newGraphiteReadHTTPRequest(t)
req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=bar.baz&from=%d&until=%d",
start.Unix(), start.Unix()+30)
req.URL.RawQuery = fmt.Sprintf(
"target=foo.bar&target=bar.baz&from=%d&until=%d",
start.Unix(), start.Unix()+30,
)
recorder := httptest.NewRecorder()
h.ServeHTTP(recorder, req)

Expand Down
33 changes: 30 additions & 3 deletions src/query/graphite/storage/m3_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/m3db/m3/src/query/block"
Expand Down Expand Up @@ -120,6 +121,24 @@ func translateQuery(query string, opts FetchOptions) (*storage.FetchQuery, error
}, nil
}

func truncateBoundsToResolution(
start time.Time,
end time.Time,
resolution time.Duration,
) (time.Time, time.Time) {
truncatedStart := start.Truncate(resolution)
// NB: if truncated start matches start, it's already valid.
if truncatedStart.Before(start) {
start = truncatedStart.Add(resolution)
}

length := float64(end.Sub(truncatedStart))
steps := math.Floor(length / float64(resolution))
truncatedLength := time.Duration(steps) * resolution
end = start.Add(truncatedLength)
return start, end
}

func translateTimeseries(
ctx xctx.Context,
result *storage.FetchResult,
Expand All @@ -139,15 +158,23 @@ func translateTimeseries(
return nil, errSeriesNoResolution
}

start, end := truncateBoundsToResolution(start, end, resolution)
length := int(end.Sub(start) / resolution)
millisPerStep := int(resolution / time.Millisecond)
values := ts.NewValues(ctx, millisPerStep, length)
for _, datapoint := range m3series.Values().Datapoints() {
index := int(datapoint.Timestamp.Sub(start) / resolution)
if index < 0 || index >= length {
// Outside of range requested
ts := datapoint.Timestamp
if ts.Before(start) {
// Outside of range requested.
continue
}

if !ts.Before(end) {
// No more valid datapoints.
break
}

index := int(datapoint.Timestamp.Sub(start) / resolution)
values.SetValueAt(index, datapoint.Value)
}

Expand Down
50 changes: 38 additions & 12 deletions src/query/graphite/storage/m3_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,16 @@ func TestTranslateQueryTrailingDot(t *testing.T) {
func TestTranslateTimeseries(t *testing.T) {
ctx := xctx.New()
resolution := 10 * time.Second
steps := 1
start := time.Now()
end := start.Add(time.Duration(steps) * resolution)
steps := 3
start := time.Now().Truncate(resolution).Add(time.Second)
end := start.Add(time.Duration(steps) * resolution).Add(time.Second * -2)

// NB: truncated steps should have 1 less data point than input series since
// the first data point is not valid.
truncatedSteps := steps - 1
truncated := start.Truncate(resolution).Add(resolution)
truncatedEnd := truncated.Add(resolution * time.Duration(truncatedSteps))

expected := 5
seriesList := make(m3ts.SeriesList, expected)
for i := 0; i < expected; i++ {
Expand All @@ -128,7 +135,13 @@ func TestTranslateTimeseries(t *testing.T) {

require.Equal(t, expected, len(translated))
for i, tt := range translated {
ex := []float64{float64(i)}
ex := make([]float64, truncatedSteps)
for j := range ex {
ex[j] = float64(i)
}

assert.Equal(t, truncated, tt.StartTime())
assert.Equal(t, truncatedEnd, tt.EndTime())
assert.Equal(t, ex, tt.SafeValues())
assert.Equal(t, fmt.Sprint("a", i), tt.Name())
}
Expand All @@ -137,9 +150,15 @@ func TestTranslateTimeseries(t *testing.T) {
func TestTranslateTimeseriesWithTags(t *testing.T) {
ctx := xctx.New()
resolution := 10 * time.Second
steps := 1
start := time.Now()
end := start.Add(time.Duration(steps) * resolution)
steps := 3
start := time.Now().Truncate(resolution).Add(time.Second)
end := start.Add(time.Duration(steps) * resolution).Add(time.Second * -2)

// NB: truncated steps should have 1 less data point than input series since
// the first data point is not valid.
truncatedSteps := steps - 1
truncated := start.Truncate(resolution).Add(resolution)
truncatedEnd := truncated.Add(resolution * time.Duration(truncatedSteps))
expected := 5
seriesList := make(m3ts.SeriesList, expected)
for i := 0; i < expected; i++ {
Expand All @@ -150,7 +169,7 @@ func TestTranslateTimeseriesWithTags(t *testing.T) {
}

resos := make([]int64, 0, expected)
for _ = range seriesList {
for range seriesList {
resos = append(resos, int64(resolution))
}

Expand All @@ -166,16 +185,22 @@ func TestTranslateTimeseriesWithTags(t *testing.T) {

require.Equal(t, expected, len(translated))
for i, tt := range translated {
ex := []float64{float64(i)}
ex := make([]float64, truncatedSteps)
for j := range ex {
ex[j] = float64(i)
}

assert.Equal(t, truncated, tt.StartTime())
assert.Equal(t, truncatedEnd, tt.EndTime())
assert.Equal(t, ex, tt.SafeValues())
assert.Equal(t, fmt.Sprint("a", i), tt.Name())
}
}

func TestFetchByQuery(t *testing.T) {
store := mock.NewMockStorage()
start := time.Now().Add(time.Hour * -1)
resolution := 10 * time.Second
start := time.Now().Add(time.Hour * -1).Truncate(resolution).Add(time.Second)
steps := 3
vals := m3ts.NewFixedStepValues(resolution, steps, 3, start)
seriesList := m3ts.SeriesList{
Expand Down Expand Up @@ -203,7 +228,7 @@ func TestFetchByQuery(t *testing.T) {
wrapper := NewM3WrappedStorage(store, enforcer, instrument.NewOptions())
ctx := xctx.New()
ctx.SetRequestContext(context.TODO())
end := time.Now()
end := start.Add(time.Duration(steps) * resolution)
opts := FetchOptions{
StartTime: start,
EndTime: end,
Expand All @@ -218,7 +243,8 @@ func TestFetchByQuery(t *testing.T) {
require.Equal(t, 1, len(result.SeriesList))
series := result.SeriesList[0]
assert.Equal(t, "a", series.Name())
assert.Equal(t, []float64{3, 3, 3}, series.SafeValues())
// NB: last point is expected to be truncated.
assert.Equal(t, []float64{3, 3}, series.SafeValues())

// NB: ensure the fetch was called with the base enforcer's child correctly
assert.Equal(t, childEnforcer, store.LastFetchOptions().Enforcer)
Expand Down