Skip to content

Commit

Permalink
[aggregator] Add compatibility for rollup rules with timed metrics (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Apr 10, 2020
1 parent 882ecb1 commit 6fab578
Show file tree
Hide file tree
Showing 53 changed files with 1,820 additions and 484 deletions.
2 changes: 1 addition & 1 deletion scripts/development/m3_stack/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ aggregator:
readBufferSize: 256
forwarding:
maxSingleDelay: 5s
entryTTL: 6h
entryTTL: 1h
entryCheckInterval: 10m
maxTimerBatchSizePerWrite: 140
defaultStoragePolicies:
Expand Down
8 changes: 4 additions & 4 deletions scripts/docker-integration-tests/aggregator/m3aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ aggregator:
envVarName: M3AGGREGATOR_HOST_ID
instanceID:
type: host_id
verboseErrors: true
metricPrefix: ""
counterPrefix: ""
timerPrefix: ""
Expand Down Expand Up @@ -230,6 +231,7 @@ aggregator:
hashType: murmur32
bufferDurationBeforeShardCutover: 10m
bufferDurationAfterShardCutoff: 10m
bufferDurationForFutureTimedMetric: 10m # Allow test to write into future.
resignTimeout: 1m
flushTimesManager:
kvConfig:
Expand Down Expand Up @@ -319,12 +321,10 @@ aggregator:
writeBufferSize: 16384
readBufferSize: 256
forwarding:
maxSingleDelay: 5s
entryTTL: 6h
maxConstDelay: 1m # Need to add some buffer window, since timed metrics by default are delayed by 1min.
entryTTL: 1h
entryCheckInterval: 10m
maxTimerBatchSizePerWrite: 140
defaultStoragePolicies:
- 10s:2d
maxNumCachedSourceSets: 2
discardNaNAggregatedValues: true
entryPool:
Expand Down
14 changes: 14 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ clusters:
readConsistencyLevel: unstrict_majority

downsample:
rules:
rollupRules:
- name: "requests per second by status code"
filter: "__name__:http_requests app:* status_code:* endpoint:*"
transforms:
- transform:
type: "PerSecond"
- rollup:
metricName: "http_requests_by_status_code"
groupBy: ["app", "status_code", "endpoint"]
aggregations: ["Sum"]
storagePolicies:
- resolution: 10s
retention: 6h
remoteAggregator:
client:
placementKV:
Expand Down
157 changes: 151 additions & 6 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@ set -xe
source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh
REVISION=$(git rev-parse HEAD)
COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/aggregator/docker-compose.yml
# quay.io/m3db/prometheus_remote_client_golang @ v0.4.3
PROMREMOTECLI_IMAGE=quay.io/m3db/prometheus_remote_client_golang@sha256:fc56df819bff9a5a087484804acf3a584dd4a78c68900c31a28896ed66ca7e7b
JQ_IMAGE=realguess/jq:1.4@sha256:300c5d9fb1d74154248d155ce182e207cf6630acccbaadd0168e18b15bfaa786
export REVISION

echo "Pull containers required for test"
docker pull $PROMREMOTECLI_IMAGE
docker pull $JQ_IMAGE

echo "Run m3dbnode"
docker-compose -f ${COMPOSE_FILE} up -d dbnode01

Expand Down Expand Up @@ -107,7 +114,7 @@ function read_carbon {
end=$(date +%s)
start=$(($end-1000))
RESPONSE=$(curl -sSfg "http://${COORDINATOR_API}/api/v1/graphite/render?target=$target&from=$start&until=$end")
test "$(echo "$RESPONSE" | jq ".[0].datapoints | .[][0] | select(. != null)" | tail -n 1)" = "$expected_val"
test "$(echo "$RESPONSE" | jq ".[0].datapoints | .[][0] | select(. != null)" | jq -s last)" = "$expected_val"
return $?
}

Expand All @@ -118,8 +125,146 @@ bash -c 'while true; do t=$(date +%s); echo "foo.bar.baz 40 $t" | nc 0.0.0.0 720
# Track PID to kill on exit
METRIC_EMIT_PID="$!"

# Read back the averaged averaged metric, we configured graphite
# aggregation policy to average each tile and we are emitting
# values 40 and 44 to get an average of 42 each tile
echo "Read back aggregated averaged metric"
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.bar.* 42
function test_aggregated_graphite_metric {
# Read back the averaged averaged metric, we configured graphite
# aggregation policy to average each tile and we are emitting
# values 40 and 44 to get an average of 42 each tile
echo "Read back aggregated averaged metric"
ATTEMPTS=40 TIMEOUT=1 MAX_TIMEOUT=4 retry_with_backoff read_carbon foo.bar.* 42

# echo "Finished with carbon metrics"
kill $METRIC_EMIT_PID
export METRIC_EMIT_PID="-1"
}

function prometheus_remote_write {
local metric_name=$1
local datapoint_timestamp=$2
local datapoint_value=$3
local expect_success=$4
local expect_success_err=$5
local expect_status=$6
local expect_status_err=$7
local label0_name=${label0_name:-label0}
local label0_value=${label0_value:-label0}
local label1_name=${label1_name:-label1}
local label1_value=${label1_value:-label1}
local label2_name=${label2_name:-label2}
local label2_value=${label2_value:-label2}

network_name="aggregator"
network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ')
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://m3coordinator01:7202/api/v1/prom/remote/write \
-t __name__:${metric_name} \
-t ${label0_name}:${label0_value} \
-t ${label1_name}:${label1_value} \
-t ${label2_name}:${label2_value} \
-d ${datapoint_timestamp},${datapoint_value} | grep -v promremotecli_log) || true)
success=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .success)
status=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .statusCode)
if [[ "$success" != "$expect_success" ]]; then
echo $expect_success_err
return 1
fi
if [[ "$status" != "$expect_status" ]]; then
echo "${expect_status_err}: actual=${status}"
return 1
fi
echo "Returned success=${success}, status=${status} as expected"
return 0
}
function prometheus_query_native {
local endpoint=${endpoint:-}
local query=${query:-}
local params=${params:-}
local metrics_type=${metrics_type:-}
local metrics_storage_policy=${metrics_storage_policy:-}
local jq_path=${jq_path:-}
local expected_value=${expected_value:-}
params_prefixed=""
if [[ "$params" != "" ]]; then
params_prefixed='&'"${params}"
fi
result=$(curl -s \
-H "M3-Metrics-Type: ${metrics_type}" \
-H "M3-Storage-Policy: ${metrics_storage_policy}" \
"0.0.0.0:7202/api/v1/${endpoint}?query=${query}${params_prefixed}" | jq -r "${jq_path}" | jq -s last)
test "$result" = "$expected_value"
return $?
}
function test_aggregated_rollup_rule {
resolution_seconds="10"
now=$(date +"%s")
now_truncate_by=$(( $now % $resolution_seconds ))
now_truncated=$(( $now - $now_truncate_by ))

echo "Test write with rollup rule"

# Emit values for endpoint /foo/bar (to ensure right values aggregated)
write_at="$now_truncated"
value="42"
value_rate="22"
value_inc_by=$(( $value_rate * $resolution_seconds ))
for i in $(seq 1 10); do
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
200 "Expected request to return status code 200"
write_at=$(( $write_at + $resolution_seconds ))
value=$(( $value + $value_inc_by ))
done

# Emit values for endpoint /foo/baz (to ensure right values aggregated)
write_at="$now_truncated"
value="84"
value_rate="4"
value_inc_by=$(( $value_rate * $resolution_seconds ))
for i in $(seq 1 10); do
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
200 "Expected request to return status code 200"
write_at=$(( $write_at + $resolution_seconds ))
value=$(( $value + $value_inc_by ))
done

start=$(( $now - 3600 ))
end=$(( $now + 3600 ))
step="30s"
params_range="start=${start}"'&'"end=${end}"'&'"step=30s"
jq_path=".data.result[0].values | .[][1] | select(. != null)"

echo "Test query rollup rule"

# Test by values are rolled up by second, then sum (for endpoint="/foo/bar")
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query_range query="http_requests_by_status_code\{endpoint=\"/foo/bar\"\}" \
params="$params_range" \
jq_path="$jq_path" expected_value="22" \
metrics_type="aggregated" metrics_storage_policy="10s:6h" \
retry_with_backoff prometheus_query_native

# Test by values are rolled up by second, then sum (for endpoint="/foo/bar")
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query_range query="http_requests_by_status_code\{endpoint=\"/foo/baz\"\}" \
params="$params_range" \
jq_path="$jq_path" expected_value="4" \
metrics_type="aggregated" metrics_storage_policy="10s:6h" \
retry_with_backoff prometheus_query_native
}

echo "Run tests"
test_aggregated_graphite_metric
test_aggregated_rollup_rule
2 changes: 1 addition & 1 deletion scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function read_carbon {
end=$(date +%s)
start=$(($end-1000))
RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/render?target=$target&from=$start&until=$end")
test "$(echo "$RESPONSE" | jq ".[0].datapoints | .[][0] | select(. != null)" | tail -n 1)" = "$expected_val"
test "$(echo "$RESPONSE" | jq ".[0].datapoints | .[][0] | select(. != null)" | jq -s last)" = "$expected_val"
return $?
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function prometheus_query_native {
result=$(curl -s \
-H "M3-Metrics-Type: ${metrics_type}" \
-H "M3-Storage-Policy: ${metrics_storage_policy}" \
"0.0.0.0:7201/api/v1/${endpoint}?query=${query}${params_prefixed}" | jq -r "${jq_path}")
"0.0.0.0:7201/api/v1/${endpoint}?query=${query}${params_prefixed}" | jq -r "${jq_path}" | jq -s last)
test "$result" = "$expected_value"
return $?
}
Expand Down
23 changes: 18 additions & 5 deletions src/aggregator/aggregation/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (
type Counter struct {
Options

sum int64
sumSq int64
count int64
max int64
min int64
lastAt time.Time
sum int64
sumSq int64
count int64
max int64
min int64
}

// NewCounter creates a new counter.
Expand All @@ -49,6 +50,15 @@ func NewCounter(opts Options) Counter {

// Update updates the counter value.
func (c *Counter) Update(timestamp time.Time, value int64) {
if c.lastAt.IsZero() || timestamp.After(c.lastAt) {
// NB(r): Only set the last value if this value arrives
// after the wall clock timestamp of previous values, not
// the arrival time (i.e. order received).
c.lastAt = timestamp
} else {
c.Options.Metrics.Counter.IncValuesOutOfOrder()
}

c.sum += value

c.count++
Expand All @@ -64,6 +74,9 @@ func (c *Counter) Update(timestamp time.Time, value int64) {
}
}

// LastAt returns the time of the last value received.
func (c *Counter) LastAt() time.Time { return c.lastAt }

// Count returns the number of values received.
func (c *Counter) Count() int64 { return c.count }

Expand Down
7 changes: 5 additions & 2 deletions src/aggregator/aggregation/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ const (
type Gauge struct {
Options

last float64
lastAt time.Time
last float64
sum float64
sumSq float64
count int64
Expand All @@ -59,8 +59,8 @@ func (g *Gauge) Update(timestamp time.Time, value float64) {
// NB(r): Only set the last value if this value arrives
// after the wall clock timestamp of previous values, not
// the arrival time (i.e. order received).
g.last = value
g.lastAt = timestamp
g.last = value
} else {
g.Options.Metrics.Gauge.IncValuesOutOfOrder()
}
Expand All @@ -79,6 +79,9 @@ func (g *Gauge) Update(timestamp time.Time, value float64) {
}
}

// LastAt returns the time of the last value received.
func (g *Gauge) LastAt() time.Time { return g.lastAt }

// Last returns the last value received.
func (g *Gauge) Last() float64 { return g.last }

Expand Down
24 changes: 22 additions & 2 deletions src/aggregator/aggregation/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ type Options struct {

// Metrics is a set of metrics that can be used by elements.
type Metrics struct {
Gauge GaugeMetrics
Counter CounterMetrics
Gauge GaugeMetrics
}

// CounterMetrics is a set of counter metrics can be used by all counters.
type CounterMetrics struct {
valuesOutOfOrder tally.Counter
}

// GaugeMetrics is a set of gauge metrics can be used by all gauges.
Expand All @@ -54,7 +60,21 @@ type GaugeMetrics struct {
func NewMetrics(scope tally.Scope) Metrics {
scope = scope.SubScope("aggregation")
return Metrics{
Gauge: newGaugeMetrics(scope.SubScope("gauges")),
Counter: newCounterMetrics(scope.SubScope("counters")),
Gauge: newGaugeMetrics(scope.SubScope("gauges")),
}
}

func newCounterMetrics(scope tally.Scope) CounterMetrics {
return CounterMetrics{
valuesOutOfOrder: scope.Counter("values-out-of-order"),
}
}

// IncValuesOutOfOrder increments value or if not initialized is a no-op.
func (m CounterMetrics) IncValuesOutOfOrder() {
if m.valuesOutOfOrder != nil {
m.valuesOutOfOrder.Inc(1)
}
}

Expand Down
Loading

0 comments on commit 6fab578

Please sign in to comment.