From 5272a694108c25fcde3a43c991780ebbbd5ab804 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 3 Sep 2024 14:55:15 +0200 Subject: [PATCH 1/5] [processor/deltatocumulative]: drop bad samples --- .../internal/data/datatest/compare/compare.go | 10 +- .../internal/metrics/iter.go | 38 +++ .../internal/metrics/metrics.go | 8 + .../internal/metrics/util.go | 25 -- .../internal/putil/pslice/pslice.go | 10 + .../internal/streams/data.go | 19 +- .../internal/streams/data_test.go | 2 +- .../internal/streams/errors.go | 4 + .../internal/telemetry/metrics.go | 2 + .../deltatocumulativeprocessor/processor.go | 10 +- .../processor_test.go | 284 ++++++++++++++++++ 11 files changed, 368 insertions(+), 44 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/internal/metrics/iter.go delete mode 100644 processor/deltatocumulativeprocessor/internal/metrics/util.go create mode 100644 processor/deltatocumulativeprocessor/processor_test.go diff --git a/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go b/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go index 91f58ff8b0f0..eb8c0f11174a 100644 --- a/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go +++ b/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go @@ -14,14 +14,14 @@ import ( var Opts = []cmp.Option{ cmpopts.EquateApprox(0, 1e-9), cmp.Exporter(func(ty reflect.Type) bool { - return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") + return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") || strings.HasPrefix(ty.PkgPath(), "github.com/open-telemetry/opentelemetry-collector-contrib") }), } -func Equal[T any](a, b T) bool { - return cmp.Equal(a, b, Opts...) +func Equal[T any](a, b T, opts ...cmp.Option) bool { + return cmp.Equal(a, b, append(Opts, opts...)...) } -func Diff[T any](a, b T) string { - return cmp.Diff(a, b, Opts...) +func Diff[T any](a, b T, opts ...cmp.Option) string { + return cmp.Diff(a, b, append(Opts, opts...)...) } diff --git a/processor/deltatocumulativeprocessor/internal/metrics/iter.go b/processor/deltatocumulativeprocessor/internal/metrics/iter.go new file mode 100644 index 000000000000..9902d22a2eec --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metrics/iter.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" +) + +func All(md pmetric.Metrics) func(func(Metric) bool) { + return func(yield func(Metric) bool) { + var ok bool + pslice.All(md.ResourceMetrics())(func(rm pmetric.ResourceMetrics) bool { + pslice.All(rm.ScopeMetrics())(func(sm pmetric.ScopeMetrics) bool { + pslice.All(sm.Metrics())(func(m pmetric.Metric) bool { + ok = yield(From(rm.Resource(), sm.Scope(), m)) + return ok + }) + return ok + }) + return ok + }) + } +} + +func Filter(md pmetric.Metrics, keep func(Metric) bool) { + md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + return !keep(From(rm.Resource(), sm.Scope(), m)) + }) + return sm.Metrics().Len() == 0 + }) + return rm.ScopeMetrics().Len() == 0 + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 6b705f5a7d24..50c802c70e1d 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -22,6 +22,14 @@ func (m *Metric) Ident() Ident { return identity.OfResourceMetric(m.res, m.scope, m.Metric) } +func (m *Metric) Resource() pcommon.Resource { + return m.res +} + +func (m *Metric) Scope() pcommon.InstrumentationScope { + return m.scope +} + func From(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { return Metric{res: res, scope: scope, Metric: metric} } diff --git a/processor/deltatocumulativeprocessor/internal/metrics/util.go b/processor/deltatocumulativeprocessor/internal/metrics/util.go deleted file mode 100644 index 985716b3cc0f..000000000000 --- a/processor/deltatocumulativeprocessor/internal/metrics/util.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - -import "go.opentelemetry.io/collector/pdata/pmetric" - -func Filter(metrics pmetric.Metrics, fn func(m Metric) bool) { - metrics.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { - rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { - sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { - return !fn(From(rm.Resource(), sm.Scope(), m)) - }) - return false - }) - return false - }) -} - -func Each(metrics pmetric.Metrics, fn func(m Metric)) { - Filter(metrics, func(m Metric) bool { - fn(m) - return true - }) -} diff --git a/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go b/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go index 5a0c2b64d863..6cc97af04132 100644 --- a/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go +++ b/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go @@ -19,3 +19,13 @@ func Equal[E comparable, S Slice[E]](a, b S) bool { } return true } + +func All[E any, S Slice[E]](slice S) func(func(E) bool) { + return func(yield func(E) bool) { + for i := 0; i < slice.Len(); i++ { + if !yield(slice.At(i)) { + break + } + } + } +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/data.go b/processor/deltatocumulativeprocessor/internal/streams/data.go index 0c54be543c45..532b4b8289e1 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data.go @@ -9,21 +9,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" ) -// Samples returns an Iterator over each sample of all streams in the metric -func Samples[D data.Point[D]](m metrics.Data[D]) Seq[D] { - mid := m.Ident() - - return func(yield func(Ident, D) bool) bool { - for i := 0; i < m.Len(); i++ { - dp := m.At(i) +func Datapoints[P data.Point[P], List metrics.Data[P]](dps List) func(func(identity.Stream, P) bool) { + return func(yield func(identity.Stream, P) bool) { + mid := dps.Ident() + pslice.All(dps)(func(dp P) bool { id := identity.OfStream(mid, dp) - if !yield(id, dp) { - break - } - } - return false + return yield(id, dp) + }) } } diff --git a/processor/deltatocumulativeprocessor/internal/streams/data_test.go b/processor/deltatocumulativeprocessor/internal/streams/data_test.go index f8180713f86f..76ae72ee1ec5 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data_test.go @@ -24,7 +24,7 @@ func BenchmarkSamples(b *testing.B) { dps := generate(b.N) b.ResetTimer() - streams.Samples(dps)(func(id streams.Ident, dp data.Number) bool { + streams.Datapoints(dps)(func(id streams.Ident, dp data.Number) bool { rdp = dp rid = id return true diff --git a/processor/deltatocumulativeprocessor/internal/streams/errors.go b/processor/deltatocumulativeprocessor/internal/streams/errors.go index e69827a6212c..c0638e091502 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/errors.go +++ b/processor/deltatocumulativeprocessor/internal/streams/errors.go @@ -19,3 +19,7 @@ type StreamErr struct { func (e StreamErr) Error() string { return fmt.Sprintf("%s: %s", e.Ident, e.Err) } + +func (e StreamErr) Unwrap() error { + return e.Err +} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index cbf52c09ff94..8062fc8388a8 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -119,8 +119,10 @@ func (f Faults[T]) Store(id streams.Ident, v T) error { return err case errors.As(err, &olderStart): inc(f.dps.dropped, reason("older-start")) + return streams.Drop case errors.As(err, &outOfOrder): inc(f.dps.dropped, reason("out-of-order")) + return streams.Drop case errors.As(err, &limit): inc(f.dps.dropped, reason("stream-limit")) // no space to store stream, drop it instead of failing silently diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index cc63f2c90e40..e0448b350c32 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -136,7 +136,8 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro defer p.mtx.Unlock() var errs error - metrics.Each(md, func(m metrics.Metric) { + metrics.Filter(md, func(m metrics.Metric) bool { + var n int switch m.Type() { case pmetric.MetricTypeSum: sum := m.Sum() @@ -145,6 +146,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro errs = errors.Join(errs, err) sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } + n = sum.DataPoints().Len() case pmetric.MetricTypeHistogram: hist := m.Histogram() if hist.AggregationTemporality() == pmetric.AggregationTemporalityDelta { @@ -152,6 +154,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro errs = errors.Join(errs, err) hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } + n = hist.DataPoints().Len() case pmetric.MetricTypeExponentialHistogram: expo := m.ExponentialHistogram() if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta { @@ -159,11 +162,16 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro errs = errors.Join(errs, err) expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } + n = expo.DataPoints().Len() } + return n > 0 }) if errs != nil { return errs } + if md.MetricCount() == 0 { + return nil + } return p.next.ConsumeMetrics(ctx, md) } diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go new file mode 100644 index 000000000000..9b95e615fea5 --- /dev/null +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -0,0 +1,284 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor_test + +import ( + "context" + "math" + "math/rand" + "strconv" + "testing" + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + self "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" +) + +func setup(t *testing.T, cfg *self.Config) (processor.Metrics, *consumertest.MetricsSink) { + t.Helper() + + next := &consumertest.MetricsSink{} + if cfg == nil { + cfg = &self.Config{MaxStale: 0, MaxStreams: math.MaxInt} + } + + proc, err := self.NewFactory().CreateMetricsProcessor( + context.Background(), + processortest.NewNopSettings(), + cfg, + next, + ) + require.NoError(t, err) + + return proc, next +} + +// TestAccumulation verifies stream identification works correctly by writing +// 100 random dps spread across 10 different streams. +// Processor output is compared against a manual aggregation on a per-stream basis. +// +// Uses Sum datatype for testing, as we are not testing actual aggregation (see +// internal/data for tests), but proper stream separation +func TestAccumulation(t *testing.T) { + proc, sink := setup(t, nil) + + sum := random.Sum() + + // create 10 distinct streams + const N = 10 + sbs := make([]SumBuilder, N) + for i := range sbs { + _, base := sum.Stream() + sbs[i] = SumBuilder{Metric: sum, base: base} + } + + // init manual aggregation state + want := make(map[identity.Stream]data.Number) + for _, s := range sbs { + id := s.id(pmetric.AggregationTemporalityCumulative) + want[id] = s.point(0, 0, 0) + } + + for i := 0; i < 100; i++ { + s := sbs[rand.Intn(N)] + + v := int64(rand.Intn(255)) + ts := pcommon.Timestamp(i) + + // write to processor + in := s.delta(s.point(0, ts, v)) + rms := s.resourceMetrics(in) + err := proc.ConsumeMetrics(context.Background(), rms) + require.NoError(t, err) + + // aggregate manually + wantv := want[s.id(pmetric.AggregationTemporalityCumulative)] + wantv.SetIntValue(wantv.IntValue() + v) + wantv.SetTimestamp(ts) + } + + // get the final processor output for each stream + got := make(map[identity.Stream]data.Number) + for _, md := range sink.AllMetrics() { + metrics.All(md)(func(m metrics.Metric) bool { + sum := metrics.Sum(m) + streams.Datapoints(sum)(func(id identity.Stream, dp data.Number) bool { + got[id] = dp + return true + }) + return true + }) + } + + sort := cmpopts.SortMaps(func(a, b identity.Stream) bool { + return a.Hash().Sum64() < b.Hash().Sum64() + }) + if diff := compare.Diff(want, got, sort); diff != "" { + t.Fatal(diff) + } +} + +// TestTimestamp verifies timestamp handling, most notably: +// - Timestamp() keeps getting advanced +// - StartTimestamp() stays the same +func TestTimestamps(t *testing.T) { + proc, sink := setup(t, nil) + + sb := stream() + point := func(start, last pcommon.Timestamp) data.Number { + return sb.point(start, last, 0) + } + + cases := []struct { + in data.Number + out data.Number + drop bool + }{{ + // first: take as-is + in: point(1000, 1100), + out: point(1000, 1100), + }, { + // subsequent: take, but keep start-ts + in: point(1100, 1200), + out: point(1000, 1200), + }, { + // gap: take + in: point(1300, 1400), + out: point(1000, 1400), + }, { + // out of order + in: point(1200, 1300), + drop: true, + }, { + // older start + in: point(500, 550), + drop: true, + }} + + for i, cs := range cases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + sink.Reset() + + in := sb.resourceMetrics(sb.delta(cs.in)) + want := make([]pmetric.Metrics, 0) + if !cs.drop { + want = []pmetric.Metrics{sb.resourceMetrics(sb.cumul(cs.out))} + } + + err := proc.ConsumeMetrics(context.Background(), in) + require.NoError(t, err) + + out := sink.AllMetrics() + if diff := compare.Diff(want, out); diff != "" { + t.Fatal(diff) + } + }) + } +} + +func TestStreamLimit(t *testing.T) { + proc, sink := setup(t, &self.Config{MaxStale: 5 * time.Minute, MaxStreams: 10}) + + good := make([]SumBuilder, 10) + for i := range good { + good[i] = stream() + } + bad := stream() + _ = bad + + diff := func(want, got []pmetric.Metrics) { + t.Helper() + if diff := compare.Diff(want, got); diff != "" { + t.Fatal(diff) + } + } + + writeGood := func(ts pcommon.Timestamp) { + for i, sb := range good { + in := sb.resourceMetrics(sb.delta(sb.point(0, ts+pcommon.Timestamp(i), 0))) + want := sb.resourceMetrics(sb.cumul(sb.point(0, ts+pcommon.Timestamp(i), 0))) + + err := proc.ConsumeMetrics(context.Background(), in) + require.NoError(t, err) + + diff([]pmetric.Metrics{want}, sink.AllMetrics()) + sink.Reset() + } + } + + // write up to limit must work + writeGood(0) + + // extra stream must be dropped, nothing written + in := bad.resourceMetrics(bad.delta(bad.point(0, 0, 0))) + err := proc.ConsumeMetrics(context.Background(), in) + require.NoError(t, err) + diff([]pmetric.Metrics{}, sink.AllMetrics()) + sink.Reset() + + // writing existing streams must still work + writeGood(100) +} + +type copyable interface { + CopyTo(pmetric.Metric) +} + +func (s SumBuilder) resourceMetrics(metrics ...copyable) pmetric.Metrics { + md := pmetric.NewMetrics() + + rm := md.ResourceMetrics().AppendEmpty() + s.Resource().CopyTo(rm.Resource()) + + sm := rm.ScopeMetrics().AppendEmpty() + s.Scope().CopyTo(sm.Scope()) + + for _, m := range metrics { + m.CopyTo(sm.Metrics().AppendEmpty()) + } + return md +} + +type SumBuilder struct { + random.Metric[data.Number] + base data.Number +} + +func (s SumBuilder) with(dps ...data.Number) pmetric.Metric { + m := pmetric.NewMetric() + s.Metric.CopyTo(m) + + for _, dp := range dps { + dp.NumberDataPoint.CopyTo(m.Sum().DataPoints().AppendEmpty()) + } + + return m +} + +func (s SumBuilder) delta(dps ...data.Number) pmetric.Metric { + m := s.with(dps...) + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + return m +} + +func (s SumBuilder) cumul(dps ...data.Number) pmetric.Metric { + m := s.with(dps...) + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + return m +} + +func (s SumBuilder) id(temp pmetric.AggregationTemporality) identity.Stream { + m := s.with(s.base) + m.Sum().SetAggregationTemporality(temp) + + mid := identity.OfMetric(s.Ident().Scope(), m) + return identity.OfStream(mid, s.base) +} + +func (s SumBuilder) point(start, ts pcommon.Timestamp, value int64) data.Number { + dp := s.base.Clone() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(value) + return dp +} + +func stream() SumBuilder { + sum := random.Sum() + _, base := sum.Stream() + return SumBuilder{Metric: sum, base: base} +} From 5c4859eb928cdcc5ffc95b34553275fafd58ef0f Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 3 Sep 2024 15:29:50 +0200 Subject: [PATCH 2/5] telemetry: update FaultsTest --- .../internal/telemetry/faults_test.go | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go index 6e9540f829c8..444e84767b45 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -27,9 +27,13 @@ func TestFaults(t *testing.T) { type Case struct { Name string Map Map - Pre func(Map, identity.Stream, data.Number) error - Bad func(Map, identity.Stream, data.Number) error - Err error + // data preparation, etc + Pre func(Map, identity.Stream, data.Number) error + // cause an error + Bad func(Map, identity.Stream, data.Number) error + // expected error that was caused + Err error + // expected return above error was converted into Want error } @@ -49,7 +53,8 @@ func TestFaults(t *testing.T) { dp.SetTimestamp(ts(40)) return dps.Store(id, dp) }, - Err: delta.ErrOlderStart{Start: ts(20), Sample: ts(10)}, + Err: delta.ErrOlderStart{Start: ts(20), Sample: ts(10)}, + Want: streams.Drop, }, { Name: "out-of-order", @@ -61,7 +66,8 @@ func TestFaults(t *testing.T) { dp.SetTimestamp(ts(10)) return dps.Store(id, dp) }, - Err: delta.ErrOutOfOrder{Last: ts(20), Sample: ts(10)}, + Err: delta.ErrOutOfOrder{Last: ts(20), Sample: ts(10)}, + Want: streams.Drop, }, { Name: "gap", @@ -75,7 +81,8 @@ func TestFaults(t *testing.T) { dp.SetTimestamp(ts(40)) return dps.Store(id, dp) }, - Err: delta.ErrGap{From: ts(20), To: ts(30)}, + Err: delta.ErrGap{From: ts(20), To: ts(30)}, + Want: nil, }, { Name: "limit", @@ -110,6 +117,7 @@ func TestFaults(t *testing.T) { return dps.Store(id, dp) }, Err: streams.ErrEvicted{Ident: evid, ErrLimit: streams.ErrLimit(1)}, + Want: nil, }, } @@ -125,17 +133,17 @@ func TestFaults(t *testing.T) { if dps == nil { dps = delta.New[data.Number]() } - onf := telemetry.ObserveNonFatal(dps, &tel.Metrics) + var realErr error + dps = errGrab[data.Number]{Map: dps, err: &realErr} + dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) if c.Pre != nil { - err := c.Pre(onf, id, dp.Clone()) + err := c.Pre(dps, id, dp.Clone()) require.NoError(t, err) } err := c.Bad(dps, id, dp.Clone()) - require.Equal(t, c.Err, err) - - err = c.Bad(onf, id, dp.Clone()) + require.Equal(t, c.Err, realErr) require.Equal(t, c.Want, err) }) } @@ -154,3 +162,14 @@ func (e HeadEvictor[T]) Evict() (evicted identity.Stream, ok bool) { }) return evicted, true } + +// errGrab stores any error that happens on Store() for later inspection +type errGrab[T any] struct { + streams.Map[T] + err *error +} + +func (e errGrab[T]) Store(id identity.Stream, dp T) error { + *e.err = e.Map.Store(id, dp) + return *e.err +} From fd85d6a16c867649b2983a70b5de785f0f3e036c Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 3 Sep 2024 15:34:48 +0200 Subject: [PATCH 3/5] *: make gci --- .../generated_component_telemetry_test.go | 9 ++++----- .../internal/metadata/generated_telemetry.go | 5 ++--- .../internal/metadata/generated_telemetry_test.go | 5 ++--- .../internal/telemetry/faults_test.go | 2 +- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go index 092d0fd2e8ba..d5a2813f3a4c 100644 --- a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go @@ -7,15 +7,14 @@ import ( "testing" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) type componentTestTelemetry struct { diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index 2308b305e714..80ab2f00224d 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -5,11 +5,10 @@ package metadata import ( "errors" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" ) // Deprecated: [v0.108.0] use LeveledMeter instead. diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go index b6e7395f4d29..0423a149d01b 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go @@ -6,15 +6,14 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" ) type mockMeter struct { diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go index 444e84767b45..f159ba11dc83 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -116,7 +116,7 @@ func TestFaults(t *testing.T) { dp.SetTimestamp(ts(20)) return dps.Store(id, dp) }, - Err: streams.ErrEvicted{Ident: evid, ErrLimit: streams.ErrLimit(1)}, + Err: streams.ErrEvicted{Ident: evid, ErrLimit: streams.ErrLimit(1)}, Want: nil, }, } From 8197712a64afd3b8107569808327a24b461bdcae Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 3 Sep 2024 16:31:21 +0200 Subject: [PATCH 4/5] *: make generate --- .../generated_component_telemetry_test.go | 9 +++++---- .../internal/metadata/generated_telemetry.go | 5 +++-- .../internal/metadata/generated_telemetry_test.go | 5 +++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go index d5a2813f3a4c..092d0fd2e8ba 100644 --- a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go @@ -7,14 +7,15 @@ import ( "testing" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/otel/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" ) type componentTestTelemetry struct { diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index 80ab2f00224d..2308b305e714 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -5,10 +5,11 @@ package metadata import ( "errors" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) // Deprecated: [v0.108.0] use LeveledMeter instead. diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go index 0423a149d01b..b6e7395f4d29 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go @@ -6,14 +6,15 @@ import ( "testing" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) type mockMeter struct { From 60716f23b4fcc37c2d5c5a5dd9e1d71f626421b6 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 3 Sep 2024 17:05:52 +0200 Subject: [PATCH 5/5] *: changelog --- .chloggen/deltatocumulative-apitest.yaml | 28 ++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 .chloggen/deltatocumulative-apitest.yaml diff --git a/.chloggen/deltatocumulative-apitest.yaml b/.chloggen/deltatocumulative-apitest.yaml new file mode 100644 index 000000000000..cc5f5fa95774 --- /dev/null +++ b/.chloggen/deltatocumulative-apitest.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulative + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: drop bad samples + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34979] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + removes bad (rejected) samples from output. previously identified and metric-tracked those as such, but didn't actually drop them. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user]