Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[downsampler] Flesh comments in metrics_appender.go #3803

Merged
merged 3 commits into from
Oct 1, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// Reuse a slice to keep the current staged metadatas we will apply.
a.curr.Pipelines = a.curr.Pipelines[:0]

// First, process any override explicitly provided as part of request
// (via request headers that specify target namespaces).
if opts.Override {
// Process an override explicitly provided as part of request.
for _, rule := range opts.OverrideRules.MappingRules {
stagedMetadatas, err := rule.StagedMetadatas()
if err != nil {
Expand All @@ -248,12 +249,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}, nil
}

// NB(r): First apply mapping rules to see which storage policies
// have been applied, any that have been applied as part of
// mapping rules that exact match a default storage policy will be
// skipped when applying default rules, so as to avoid storing
// the same metrics in the same namespace with the same metric
// name and tags (i.e. overwriting each other).
// Next, apply any mapping rules that match. We track which storage policies have been applied based on the
// mapping rules that match. Any storage policies that have been applied will be skipped when applying
// the auto-mapping rules to avoid redundant writes (i.e. overwriting each other).
var (
ruleStagedMetadatas = matchResult.ForExistingIDAt(nowNanos)
dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
Expand Down Expand Up @@ -293,13 +291,12 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
a.curr.Pipelines = append(a.curr.Pipelines, pipelines.Pipelines...)
}

// Always aggregate any default staged metadatas with a few exceptions.
// Exceptions are:
// Next, we cover auto-mapping (otherwise referred to as default) rules.
// We always aggregate any default rules with a few exceptions:
// 1. A mapping rule has provided an override for a storage policy,
// if so then skip aggregating for that storage policy).
// 2. Any type of drop rule has been set, since they should only
// impact mapping rules, not default staged metadatas provided from
// auto-mapping rules (i.e. default namespace aggregation).
// if so then skip aggregating for that storage policy.
// This is what we calculated in the step above.
// 2. Any type of drop rule has been set. Drop rules should mean that the auto-mapping rules are ignored.
if !a.curr.Pipelines.IsDropPolicySet() {
// No drop rule has been set as part of rule matching.
for idx, stagedMetadatasProto := range a.defaultStagedMetadatasProtos {
Expand Down Expand Up @@ -394,7 +391,9 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
// Apply drop policies results
a.curr.Pipelines, dropApplyResult = a.curr.Pipelines.ApplyOrRemoveDropPolicies()

// Skip sending to downsampler if there's a drop policy or no pipeline defined.
// Now send the results of mapping / auto-mapping rules to the relevant downsampler.
// We explicitly skip sending if there's no work to be done: specifically
// if there's a drop policy or if the staged metadata is a no-op.
if len(a.curr.Pipelines) > 0 && !a.curr.IsDropPolicyApplied() && !a.curr.IsDefault() {
// Send to downsampler if we have something in the pipeline.
a.debugLogMatch("downsampler using built mapping staged metadatas",
Expand All @@ -405,6 +404,7 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
}
}

// Finally, process and deliver staged metadata resulting from rollup rules.
numRollups := matchResult.NumNewRollupIDs()
for i := 0; i < numRollups; i++ {
rollup := matchResult.ForNewRollupIDsAt(i, nowNanos)
Expand Down