Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[aggregator] keep metric type during the aggregation" #3099

Merged
merged 2 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions scripts/development/m3_stack/m3coordinator-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,3 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 0 additions & 2 deletions scripts/development/m3_stack/m3coordinator-standard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,3 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 0 additions & 2 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,3 @@ ingest:
retry:
maxBackoff: 10s
jitter: true

storeMetricsType: true
52 changes: 0 additions & 52 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,12 @@ function prometheus_remote_write {
local label1_value=${label1_value:-label1}
local label2_name=${label2_name:-label2}
local label2_value=${label2_value:-label2}
local metric_type=${metric_type:counter}

network_name="aggregator"
network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1)
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://m3coordinator01:7202/api/v1/prom/remote/write \
-h M3-Prom-Type:${metric_type} \
-t __name__:${metric_name} \
-t ${label0_name}:${label0_value} \
-t ${label1_name}:${label1_value} \
Expand Down Expand Up @@ -219,22 +217,6 @@ function prometheus_query_native {
return $?
}

function dbnode_fetch {
local namespace=${namespace}
local id=${id}
local rangeStart=${rangeStart}
local rangeEnd=${rangeEnd}
local jq_path=${jq_path:-}
local expected_value=${expected_value:-}

result=$(curl -s \
"0.0.0.0:9002/fetch" \
"-d" \
"{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}")
test "$result" = "$expected_value"
return $?
}

function test_aggregated_rollup_rule {
resolution_seconds="10"
now=$(date +"%s")
Expand All @@ -252,7 +234,6 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
metric_type="counter" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand All @@ -270,7 +251,6 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
metric_type="gauge" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand Down Expand Up @@ -304,38 +284,6 @@ function test_aggregated_rollup_rule {
retry_with_backoff prometheus_query_native
}

function test_metric_type_survives_aggregation {
now=$(date +"%s")

echo "Test metric type should be kept after aggregation"

# Emit values for endpoint /foo/bar (to ensure right values aggregated)
write_at="$now_truncated"
value="42"

metric_type="counter" \
prometheus_remote_write \
metric_type_test $now $value \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

start=$(( $now - 3600 ))
end=$(( $now + 3600 ))
jq_path=".datapoints[0].annotation"

echo "Test query metric type"

# Test by metric types are stored in aggregated namespace
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
namespace="agg" \
id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \
rangeStart=${start} \
rangeEnd=${end} \
jq_path="$jq_path" expected_value="CAEQAQ==" \
retry_with_backoff dbnode_fetch
}

echo "Run tests"
test_aggregated_graphite_metric
test_aggregated_rollup_rule
test_metric_type_survives_aggregation
2 changes: 0 additions & 2 deletions src/aggregator/aggregation/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ func TestCounterCustomAggregationType(t *testing.T) {
require.Equal(t, float64(338350), v)
case aggregation.Stdev:
require.InDelta(t, 29.01149, v, 0.001)
case aggregation.Last:
require.Equal(t, 0.0, v)
default:
require.Equal(t, float64(0), v)
require.False(t, aggType.IsValidForCounter())
Expand Down
1 change: 0 additions & 1 deletion src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ func (agg *aggregator) AddPassthrough(
ChunkedID: id.ChunkedID{
Data: []byte(metric.ID),
},
Type: metric.Type,
TimeNanos: metric.TimeNanos,
Value: metric.Value,
},
Expand Down
7 changes: 3 additions & 4 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/aggregator/aggregator/elem_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ func TestCounterElemBaseResetSetData(t *testing.T) {

func TestCounterElemBaseResetSetDataInvalidTypes(t *testing.T) {
e := counterElemBase{}
err := e.ResetSetData(nil, maggregation.Types{maggregation.P10}, false)
err := e.ResetSetData(nil, maggregation.Types{maggregation.Last}, false)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "invalid aggregation types P10 for counter"))
require.True(t, strings.Contains(err.Error(), "invalid aggregation types Last for counter"))
}

func TestTimerElemBase(t *testing.T) {
Expand Down
7 changes: 2 additions & 5 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,8 @@ func TestCounterResetSetData(t *testing.T) {

func TestCounterResetSetDataInvalidAggregationType(t *testing.T) {
opts := NewOptions()
ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes,
applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts)
err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.P10},
applied.DefaultPipeline, 0, NoPrefixNoSuffix)
ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes, applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts)
err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.Last}, applied.DefaultPipeline, 0, NoPrefixNoSuffix)
require.Error(t, err)
}

Expand Down Expand Up @@ -1812,7 +1810,6 @@ func testFlushLocalMetricFn() (
return func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
2 changes: 0 additions & 2 deletions src/aggregator/aggregator/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package aggregator
import (
"time"

"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
)
Expand Down Expand Up @@ -84,7 +83,6 @@ const (
type flushLocalMetricFn func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
7 changes: 3 additions & 4 deletions src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ func (e *GenericElem) processValueWithAggregationLock(
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp)
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType,
e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp)
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
}
}
} else {
Expand Down
1 change: 0 additions & 1 deletion src/aggregator/aggregator/handler/writer/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (w *protobufWriter) prepare(mp aggregated.ChunkedMetricWithStoragePolicy) (
w.m.ID = append(w.m.ID, mp.Suffix...)
w.m.Metric.TimeNanos = mp.TimeNanos
w.m.Metric.Value = mp.Value
w.m.Metric.Type = mp.Type
w.m.StoragePolicy = mp.StoragePolicy
shard := w.shardFn(w.m.ID, w.numShards)
return w.m, shard
Expand Down
4 changes: 0 additions & 4 deletions src/aggregator/aggregator/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/m3db/m3/src/aggregator/aggregator/handler"
"github.com/m3db/m3/src/aggregator/aggregator/handler/writer"
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/aggregated"
metricid "github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
Expand Down Expand Up @@ -435,7 +434,6 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) {
func (l *baseMetricList) consumeLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand All @@ -449,7 +447,6 @@ func (l *baseMetricList) consumeLocalMetric(
chunkedMetricWithPolicy := aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: chunkedID,
Type: metricType,
TimeNanos: timeNanos,
Value: value,
},
Expand All @@ -466,7 +463,6 @@ func (l *baseMetricList) consumeLocalMetric(
func (l *baseMetricList) discardLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
2 changes: 0 additions & 2 deletions src/aggregator/aggregator/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,6 @@ func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) {
ChunkedID: id.ChunkedID{
Data: ep.metric.ID,
},
Type: ep.metric.Type,
TimeNanos: alignedStart,
Value: ep.metric.Value,
},
Expand Down Expand Up @@ -1057,7 +1056,6 @@ func TestForwardedMetricListLastStepLocalFlush(t *testing.T) {
Prefix: ep.expectedPrefix,
Data: ep.metric.ID,
},
Type: ep.metric.Type,
TimeNanos: alignedStart,
Value: ep.metric.Values[0],
},
Expand Down
7 changes: 3 additions & 4 deletions src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/aggregator/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ genny-all: genny-aggregator-counter-elem genny-aggregator-timer-elem genny-aggre
genny-aggregator-counter-elem:
cat $(m3db_package_path)/src/aggregator/aggregator/generic_elem.go \
| awk '/^package/{i++}i' \
| sed 's/metric.GaugeType/metric.CounterType/' \
| genny -out=$(m3db_package_path)/src/aggregator/aggregator/counter_elem_gen.go -pkg=aggregator gen \
"timedAggregation=timedCounter lockedAggregation=lockedCounterAggregation typeSpecificAggregation=counterAggregation typeSpecificElemBase=counterElemBase genericElemPool=CounterElemPool GenericElem=CounterElem"

Expand Down
5 changes: 1 addition & 4 deletions src/cmd/services/m3coordinator/ingest/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ func (cfg Configuration) NewIngester(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
storeMetricsType bool,
) (*Ingester, error) {
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions, storeMetricsType)
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions)
if err != nil {
return nil, err
}
Expand All @@ -59,7 +58,6 @@ func (cfg Configuration) newOptions(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
storeMetricsType bool,
) (Options, error) {
scope := instrumentOptions.MetricsScope().Tagged(
map[string]string{"component": "ingester"},
Expand Down Expand Up @@ -100,6 +98,5 @@ func (cfg Configuration) newOptions(
RetryOptions: cfg.Retry.NewOptions(scope),
Sampler: sampler,
InstrumentOptions: instrumentOptions,
StoreMetricsType: storeMetricsType,
}, nil
}
Loading