Skip to content

Commit

Permalink
Clear all the state after exporting
Browse files Browse the repository at this point in the history
Following the discussion here: open-telemetry#29461 (comment)
  • Loading branch information
RichieSams committed Apr 24, 2024
1 parent 154e7cb commit 5b5ebfb
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 19 deletions.
4 changes: 4 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,7 @@ func (s *Staleness[T]) Evict() identity.Stream {
s.items.Delete(id)
return id
}

func (s *Staleness[T]) Clear() {
s.items.Clear()
}
2 changes: 1 addition & 1 deletion internal/exp/metrics/staleness/staleness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestStaleness(t *testing.T) {
max := 1 * time.Second
stalenessMap := NewStaleness[int](
max,
make(streams.HashMap[int]),
&streams.HashMap[int]{},
)

idA := generateStreamID(t, map[string]any{
Expand Down
27 changes: 16 additions & 11 deletions internal/exp/metrics/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,30 @@ type Map[T any] interface {
Delete(identity.Stream)
Items() func(yield func(identity.Stream, T) bool) bool
Len() int
Clear()
}

var _ Map[any] = HashMap[any](nil)
var _ Map[any] = &HashMap[any]{}

type HashMap[T any] map[identity.Stream]T

func (m HashMap[T]) Load(id identity.Stream) (T, bool) {
v, ok := (map[identity.Stream]T)(m)[id]
func (m *HashMap[T]) Load(id identity.Stream) (T, bool) {
v, ok := (*m)[id]
return v, ok
}

func (m HashMap[T]) Store(id identity.Stream, v T) error {
(map[identity.Stream]T)(m)[id] = v
func (m *HashMap[T]) Store(id identity.Stream, v T) error {
(*m)[id] = v
return nil
}

func (m HashMap[T]) Delete(id identity.Stream) {
delete((map[identity.Stream]T)(m), id)
func (m *HashMap[T]) Delete(id identity.Stream) {
delete(*m, id)
}

func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool {
func (m *HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool {
return func(yield func(identity.Stream, T) bool) bool {
for id, v := range (map[identity.Stream]T)(m) {
for id, v := range *m {
if !yield(id, v) {
break
}
Expand All @@ -47,8 +48,12 @@ func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool {
}
}

func (m HashMap[T]) Len() int {
return len((map[identity.Stream]T)(m))
func (m *HashMap[T]) Len() int {
return len(*m)
}

func (m *HashMap[T]) Clear() {
*m = map[identity.Stream]T{}
}

// Evictors remove the "least important" stream based on some strategy such as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func New[D data.Point[D]]() Accumulator[D] {
return Accumulator[D]{
Map: make(exp.HashMap[D]),
Map: &exp.HashMap[D]{},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func TestLimit(t *testing.T) {
sum := random.Sum()

items := make(exp.HashMap[data.Number])
items := &exp.HashMap[data.Number]{}
lim := streams.Limit(items, 10)

ids := make([]identity.Stream, 10)
Expand Down
5 changes: 5 additions & 0 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ func (p *Processor) exportMetrics() {
if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil {
p.logger.Error("Metrics export failed", zap.Error(err))
}

// Clear everything now that we've exported
p.numbers.Clear()
p.histograms.Clear()
p.expHistograms.Clear()
}

func getOrCreateMetric(
Expand Down
18 changes: 13 additions & 5 deletions processor/intervalprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"go.opentelemetry.io/collector/processor/processortest"
)

// TODO: Add tests for the other data types. Ensuring things like: gauges are passed through unchanged, etc.

// TODO: Add tests for data expiration

func TestAggregation(t *testing.T) {
Expand All @@ -28,7 +26,6 @@ func TestAggregation(t *testing.T) {
next []testMetric
outputs []testMetric
}{
// TODO: Add many more test cases for all the edge cases
{
name: "BasicAggregation",
inputs: []testMetric{
Expand Down Expand Up @@ -259,14 +256,25 @@ func TestAggregation(t *testing.T) {
// Pretend we hit the interval timer and call export
processor.exportMetrics()

// Next should have gotten two data sets:
// Processor should now be empty
require.Equal(t, 0, processor.numbers.Len())
require.Equal(t, 0, processor.histograms.Len())
require.Equal(t, 0, processor.expHistograms.Len())

// Exporting again should return nothing
processor.exportMetrics()

// Next should have gotten three data sets:
// 1. Anything left over from ConsumeMetrics()
// 2. Anything exported from exportMetrics()
// 3. An empty entry for the second call to exportMetrics()
allMetrics := next.AllMetrics()
require.Len(t, allMetrics, 2)
require.Len(t, allMetrics, 3)

nextData := convertMetricsToTestData(t, allMetrics[0])
exportData := convertMetricsToTestData(t, allMetrics[1])
secondExportData := convertMetricsToTestData(t, allMetrics[2])
require.Empty(t, secondExportData)

expectedNextData := testMetrics{
{
Expand Down

0 comments on commit 5b5ebfb

Please sign in to comment.