diff --git a/internal/exp/metrics/identity/metric.go b/internal/exp/metrics/identity/metric.go index ceaea90b8e0e..7068bfbdbef5 100644 --- a/internal/exp/metrics/identity/metric.go +++ b/internal/exp/metrics/identity/metric.go @@ -4,14 +4,13 @@ package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" import ( + "fmt" "hash" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) -type metric = Metric - type Metric struct { scope @@ -23,21 +22,21 @@ type Metric struct { temporality pmetric.AggregationTemporality } -func (i Metric) Hash() hash.Hash64 { - sum := i.scope.Hash() - sum.Write([]byte(i.name)) - sum.Write([]byte(i.unit)) +func (m Metric) Hash() hash.Hash64 { + sum := m.scope.Hash() + sum.Write([]byte(m.name)) + sum.Write([]byte(m.unit)) var mono byte - if i.monotonic { + if m.monotonic { mono = 1 } - sum.Write([]byte{byte(i.ty), mono, byte(i.temporality)}) + sum.Write([]byte{byte(m.ty), mono, byte(m.temporality)}) return sum } -func (i Metric) Scope() Scope { - return i.scope +func (m Metric) Scope() Scope { + return m.scope } func OfMetric(scope Scope, m pmetric.Metric) Metric { @@ -66,6 +65,10 @@ func OfMetric(scope Scope, m pmetric.Metric) Metric { return id } +func (m Metric) String() string { + return fmt.Sprintf("metric/%x", m.Hash().Sum64()) +} + func OfResourceMetric(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { return OfMetric(OfScope(OfResource(res), scope), metric) } diff --git a/internal/exp/metrics/identity/resource.go b/internal/exp/metrics/identity/resource.go index 990fb71e64ea..7114c45facce 100644 --- a/internal/exp/metrics/identity/resource.go +++ b/internal/exp/metrics/identity/resource.go @@ -4,6 +4,7 @@ package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" import ( + "fmt" "hash" "hash/fnv" @@ -24,6 +25,10 @@ func (r Resource) Hash() hash.Hash64 { return sum } +func (r Resource) String() string { + return fmt.Sprintf("resource/%x", r.Hash().Sum64()) +} + func OfResource(r pcommon.Resource) Resource { return Resource{ attrs: pdatautil.MapHash(r.Attributes()), diff --git a/internal/exp/metrics/identity/scope.go b/internal/exp/metrics/identity/scope.go index db516bc14c79..0ea0b0f15952 100644 --- a/internal/exp/metrics/identity/scope.go +++ b/internal/exp/metrics/identity/scope.go @@ -4,6 +4,7 @@ package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" import ( + "fmt" "hash" "go.opentelemetry.io/collector/pdata/pcommon" @@ -33,6 +34,10 @@ func (s Scope) Resource() Resource { return s.resource } +func (s Scope) String() string { + return fmt.Sprintf("scope/%x", s.Hash().Sum64()) +} + func OfScope(res Resource, scope pcommon.InstrumentationScope) Scope { return Scope{ resource: res, diff --git a/internal/exp/metrics/identity/stream.go b/internal/exp/metrics/identity/stream.go index 19988f7730dc..c29af83edfaa 100644 --- a/internal/exp/metrics/identity/stream.go +++ b/internal/exp/metrics/identity/stream.go @@ -4,6 +4,7 @@ package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" import ( + "fmt" "hash" "go.opentelemetry.io/collector/pdata/pcommon" @@ -12,18 +13,22 @@ import ( ) type Stream struct { - metric - attrs [16]byte + metric Metric + attrs [16]byte } -func (i Stream) Hash() hash.Hash64 { - sum := i.metric.Hash() - sum.Write(i.attrs[:]) +func (s Stream) Hash() hash.Hash64 { + sum := s.metric.Hash() + sum.Write(s.attrs[:]) return sum } -func (i Stream) Metric() Metric { - return i.metric +func (s Stream) Metric() Metric { + return s.metric +} + +func (s Stream) String() string { + return fmt.Sprintf("stream/%x", s.Hash().Sum64()) } func OfStream[DataPoint attrPoint](m Metric, dp DataPoint) Stream { diff --git a/internal/exp/metrics/identity/strings.go b/internal/exp/metrics/identity/strings.go deleted file mode 100644 index 7339f95a578c..000000000000 --- a/internal/exp/metrics/identity/strings.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package identity // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - -import ( - "fmt" -) - -func (r Resource) String() string { - return fmt.Sprintf("resource/%x", r.Hash().Sum64()) -} - -func (s Scope) String() string { - return fmt.Sprintf("scope/%x", s.Hash().Sum64()) -} - -func (m Metric) String() string { - return fmt.Sprintf("metric/%x", m.Hash().Sum64()) -} - -func (s Stream) String() string { - return fmt.Sprintf("stream/%x", s.Hash().Sum64()) -} diff --git a/internal/exp/metrics/metrics.go b/internal/exp/metrics/metrics.go index cb931225043e..affdf76fa77f 100644 --- a/internal/exp/metrics/metrics.go +++ b/internal/exp/metrics/metrics.go @@ -4,10 +4,10 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics" import ( + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" ) // Merge will merge the metrics data in mdB into mdA, then return mdA. @@ -111,7 +111,7 @@ outer: return smA } -func mergeDataPoints[DPS streams.DataPointSlice[DP], DP streams.DataPoint[DP]](dataPointsA DPS, dataPointsB DPS) DPS { +func mergeDataPoints[DPS dataPointSlice[DP], DP dataPoint[DP]](dataPointsA DPS, dataPointsB DPS) DPS { // Append all the datapoints from B to A for i := 0; i < dataPointsB.Len(); i++ { dpB := dataPointsB.At(i) @@ -122,3 +122,15 @@ func mergeDataPoints[DPS streams.DataPointSlice[DP], DP streams.DataPoint[DP]](d return dataPointsA } + +type dataPointSlice[DP dataPoint[DP]] interface { + Len() int + At(i int) DP + AppendEmpty() DP +} + +type dataPoint[Self any] interface { + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + CopyTo(dest Self) +} diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go deleted file mode 100644 index 895174422878..000000000000 --- a/internal/exp/metrics/staleness/staleness.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - -import ( - "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" -) - -// We override how Now() is returned, so we can have deterministic tests -var NowFunc = time.Now - -var ( - _ streams.Map[any] = (*Staleness[any])(nil) - _ streams.Evictor = (*Staleness[any])(nil) -) - -// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can -// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is -// older than the `max` -// -// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded -// environment, then it is the user's responsibility to properly serialize calls to Staleness methods -type Staleness[T any] struct { - Max time.Duration - - items streams.Map[T] - pq PriorityQueue -} - -func NewStaleness[T any](maxDuration time.Duration, items streams.Map[T]) *Staleness[T] { - return &Staleness[T]{ - Max: maxDuration, - - items: items, - pq: NewPriorityQueue(), - } -} - -// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value -func (s *Staleness[T]) Load(id identity.Stream) (T, bool) { - return s.items.Load(id) -} - -// Store the given key value pair in the map, and update the pair's staleness value to "now" -func (s *Staleness[T]) Store(id identity.Stream, v T) error { - s.pq.Update(id, NowFunc()) - return s.items.Store(id, v) -} - -func (s *Staleness[T]) Delete(id identity.Stream) { - s.items.Delete(id) -} - -// Items returns an iterator function that in future go version can be used with range -// See: https://go.dev/wiki/RangefuncExperiment -func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { - return s.items.Items() -} - -// ExpireOldEntries will remove all entries whose staleness value is older than `now() - max` -// For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would -// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. -func (s *Staleness[T]) ExpireOldEntries() { - now := NowFunc() - for { - if s.Len() == 0 { - return - } - _, ts := s.pq.Peek() - if now.Sub(ts) < s.Max { - break - } - id, _ := s.pq.Pop() - s.items.Delete(id) - } -} - -func (s *Staleness[T]) Len() int { - return s.items.Len() -} - -func (s *Staleness[T]) Next() time.Time { - _, ts := s.pq.Peek() - return ts -} - -func (s *Staleness[T]) Evict() (identity.Stream, bool) { - _, ts := s.pq.Peek() - if NowFunc().Sub(ts) < s.Max { - return identity.Stream{}, false - } - - id, _ := s.pq.Pop() - s.items.Delete(id) - return id, true -} - -func (s *Staleness[T]) Clear() { - s.items.Clear() -} - -type Tracker struct { - pq PriorityQueue -} - -func NewTracker() Tracker { - return Tracker{pq: NewPriorityQueue()} -} - -func (stale Tracker) Refresh(ts time.Time, ids ...identity.Stream) { - for _, id := range ids { - stale.pq.Update(id, ts) - } -} - -func (stale Tracker) Collect(maxDuration time.Duration) []identity.Stream { - now := NowFunc() - - var ids []identity.Stream - for stale.pq.Len() > 0 { - _, ts := stale.pq.Peek() - if now.Sub(ts) < maxDuration { - break - } - id, _ := stale.pq.Pop() - ids = append(ids, id) - } - - return ids -} diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go deleted file mode 100644 index 24de1853c99f..000000000000 --- a/internal/exp/metrics/staleness/staleness_test.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package staleness - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" -) - -func TestStaleness(t *testing.T) { - stalenessMap := NewStaleness[int]( - 1*time.Second, - make(streams.HashMap[int]), - ) - - idA := generateStreamID(t, map[string]any{ - "aaa": "123", - }) - idB := generateStreamID(t, map[string]any{ - "bbb": "456", - }) - idC := generateStreamID(t, map[string]any{ - "ccc": "789", - }) - idD := generateStreamID(t, map[string]any{ - "ddd": "024", - }) - - initialTime := time.Time{} - timeA := initialTime.Add(2 * time.Second) - timeB := initialTime.Add(1 * time.Second) - timeC := initialTime.Add(3 * time.Second) - timeD := initialTime.Add(4 * time.Second) - - valueA := 1 - valueB := 4 - valueC := 7 - valueD := 0 - - // Add the values to the map - NowFunc = func() time.Time { return timeA } - _ = stalenessMap.Store(idA, valueA) - NowFunc = func() time.Time { return timeB } - _ = stalenessMap.Store(idB, valueB) - NowFunc = func() time.Time { return timeC } - _ = stalenessMap.Store(idC, valueC) - NowFunc = func() time.Time { return timeD } - _ = stalenessMap.Store(idD, valueD) - - // Set the time to 2.5s and run expire - // This should remove B, but the others should remain - // (now == 2.5s, B == 1s, max == 1s) - // now > B + max - NowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } - stalenessMap.ExpireOldEntries() - validateStalenessMapEntries(t, - map[identity.Stream]int{ - idA: valueA, - idC: valueC, - idD: valueD, - }, - stalenessMap, - ) - - // Set the time to 4.5s and run expire - // This should remove A and C, but D should remain - // (now == 2.5s, A == 2s, C == 3s, max == 1s) - // now > A + max AND now > C + max - NowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } - stalenessMap.ExpireOldEntries() - validateStalenessMapEntries(t, - map[identity.Stream]int{ - idD: valueD, - }, - stalenessMap, - ) -} - -func validateStalenessMapEntries(t *testing.T, expected map[identity.Stream]int, sm *Staleness[int]) { - actual := map[identity.Stream]int{} - - sm.Items()(func(key identity.Stream, value int) bool { - actual[key] = value - return true - }) - require.Equal(t, expected, actual) -} - -func TestEvict(t *testing.T) { - now := 0 - NowFunc = func() time.Time { - return time.Unix(int64(now), 0) - } - - stale := NewStaleness(1*time.Minute, make(streams.HashMap[int])) - - now = 10 - idA := generateStreamID(t, map[string]any{"aaa": "123"}) - err := stale.Store(idA, 0) - require.NoError(t, err) - - now = 20 - idB := generateStreamID(t, map[string]any{"bbb": "456"}) - err = stale.Store(idB, 1) - require.NoError(t, err) - - require.Equal(t, 2, stale.Len()) - - // nothing stale yet, must not evict - _, ok := stale.Evict() - require.False(t, ok) - require.Equal(t, 2, stale.Len()) - - // idA stale - now = 71 - gone, ok := stale.Evict() - require.True(t, ok) - require.NotZero(t, gone) - require.Equal(t, 1, stale.Len()) - - // idB not yet stale - _, ok = stale.Evict() - require.False(t, ok) - require.Equal(t, 1, stale.Len()) -} diff --git a/internal/exp/metrics/staleness/tracker.go b/internal/exp/metrics/staleness/tracker.go new file mode 100644 index 000000000000..68676e19b0f2 --- /dev/null +++ b/internal/exp/metrics/staleness/tracker.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type Tracker struct { + pq PriorityQueue +} + +func NewTracker() Tracker { + return Tracker{pq: NewPriorityQueue()} +} + +func (tr Tracker) Refresh(ts time.Time, ids ...identity.Stream) { + for _, id := range ids { + tr.pq.Update(id, ts) + } +} + +func (tr Tracker) Collect(maxDuration time.Duration) []identity.Stream { + now := time.Now() + + var ids []identity.Stream + for tr.pq.Len() > 0 { + _, ts := tr.pq.Peek() + if now.Sub(ts) < maxDuration { + break + } + id, _ := tr.pq.Pop() + ids = append(ids, id) + } + + return ids +} diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go deleted file mode 100644 index 5f0d715b6962..000000000000 --- a/internal/exp/metrics/streams/streams.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - -import ( - "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" -) - -// Sequence of streams that can be iterated upon -type Seq[T any] func(yield func(identity.Stream, T) bool) bool - -// Map defines a collection of items tracked by a stream-id and the operations -// on it -type Map[T any] interface { - Load(identity.Stream) (T, bool) - Store(identity.Stream, T) error - Delete(identity.Stream) - Items() func(yield func(identity.Stream, T) bool) bool - Len() int - Clear() -} - -var _ Map[any] = HashMap[any](nil) - -type HashMap[T any] map[identity.Stream]T - -func (m HashMap[T]) Load(id identity.Stream) (T, bool) { - v, ok := (map[identity.Stream]T)(m)[id] - return v, ok -} - -func (m HashMap[T]) Store(id identity.Stream, v T) error { - (map[identity.Stream]T)(m)[id] = v - return nil -} - -func (m HashMap[T]) Delete(id identity.Stream) { - delete((map[identity.Stream]T)(m), id) -} - -func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool { - return func(yield func(identity.Stream, T) bool) bool { - for id, v := range (map[identity.Stream]T)(m) { - if !yield(id, v) { - break - } - } - return false - } -} - -func (m HashMap[T]) Len() int { - return len((map[identity.Stream]T)(m)) -} - -func (m HashMap[T]) Clear() { - clear(m) -} - -// Evictors remove the "least important" stream based on some strategy such as -// the oldest, least active, etc. -// -// Returns whether a stream was evicted and if so the now gone stream id -type Evictor interface { - Evict() (gone identity.Stream, ok bool) -} - -type DataPointSlice[DP DataPoint[DP]] interface { - Len() int - At(i int) DP - AppendEmpty() DP -} - -type DataPoint[Self any] interface { - Timestamp() pcommon.Timestamp - Attributes() pcommon.Map - CopyTo(dest Self) -}