Skip to content

Commit

Permalink
Move global random number generator to randRes field (#5819)
Browse files Browse the repository at this point in the history
Instead of using a global random number generator for all `randRes`,
have each value use its own. This removes the need for locking and
managing concurrent safe access to the global. Also, the field, given
the `Reservoir` type is not concurrent safe and the metric pipeline
guards this, does not need a `sync.Mutex` to guard it.

Supersedes #5815 
Fix #5814

### Performance Analysis

This change has approximately equivalent performance as the existing
code based on existing benchmarks.

```terminal
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/metric
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
                           │   old.txt   │              new.txt               │
                           │   sec/op    │   sec/op     vs base               │
Exemplars/Int64Counter/8-8   14.00µ ± 3%   13.44µ ± 4%  -3.98% (p=0.001 n=10)

                           │   old.txt    │             new.txt              │
                           │     B/op     │     B/op      vs base            │
Exemplars/Int64Counter/8-8   3.791Ki ± 0%   3.791Ki ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

                           │  old.txt   │            new.txt             │
                           │ allocs/op  │ allocs/op   vs base            │
Exemplars/Int64Counter/8-8   84.00 ± 0%   84.00 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal
```
  • Loading branch information
MrAlias authored Sep 16, 2024
1 parent a7e83aa commit 42fd8fe
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 58 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `Logger.Enabled` in `go.opentelemetry.io/otel/log` now accepts a newly introduced `EnabledParameters` type instead of `Record`. (#5791)
- `FilterProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log/internal/x` now accepts `EnabledParameters` instead of `Record`. (#5791)

### Fixed

- The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
63 changes: 63 additions & 0 deletions sdk/metric/exemplar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"runtime"
"sync"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestFixedSizeExemplarConcurrentSafe(t *testing.T) {
// Tests https://github.com/open-telemetry/opentelemetry-go/issues/5814

t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on")

r := NewManualReader()
m := NewMeterProvider(WithReader(r)).Meter("exemplar-concurrency")
// Use two instruments to get concurrent access to any shared globals.
i0, err := m.Int64Counter("counter.0")
require.NoError(t, err)
i1, err := m.Int64Counter("counter.1")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())

add := func() {
i0.Add(ctx, 1)
i1.Add(ctx, 2)
}

goRoutines := max(10, runtime.NumCPU())

var wg sync.WaitGroup
for n := 0; n < goRoutines; n++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
require.NotPanics(t, add)
}
}
}()
}

const collections = 100
var rm metricdata.ResourceMetrics
for c := 0; c < collections; c++ {
require.NotPanics(t, func() { _ = r.Collect(ctx, &rm) })
}

cancel()
wg.Wait()
}
81 changes: 40 additions & 41 deletions sdk/metric/internal/exemplar/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,50 @@ import (
"context"
"math"
"math/rand"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
)

var (
// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
return newRandRes(newStorage(k))
}

type randRes struct {
*storage

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64

// rng is used to make sampling decisions.
//
// Do not use crypto/rand. There is no reason for the decrease in performance
// given this is not a security sensitive decision.
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
// Ensure concurrent safe access to rng and its underlying source.
rngMu sync.Mutex
)
rng *rand.Rand
}

// random returns, as a float64, a uniform pseudo-random number in the open
// interval (0.0,1.0).
func random() float64 {
func newRandRes(s *storage) *randRes {
r := &randRes{
storage: s,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
r.reset()
return r
}

// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (r *randRes) randomFloat64() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
Expand All @@ -43,39 +68,13 @@ func random() float64 {
//
// There are likely many other methods to explore here as well.

rngMu.Lock()
defer rngMu.Unlock()

f := rng.Float64()
f := r.rng.Float64()
for f == 0 {
f = rng.Float64()
f = r.rng.Float64()
}
return f
}

// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
r := &randRes{storage: newStorage(k)}
r.reset()
return r
}

type randRes struct {
*storage

// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}

func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
Expand Down Expand Up @@ -123,7 +122,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
} else {
if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rng.Int63n(int64(cap(r.store))))
idx := int(r.rng.Int63n(int64(cap(r.store))))
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
Expand All @@ -147,7 +146,7 @@ func (r *randRes) reset() {
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(random()) / float64(cap(r.store)))
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))

r.advance()
}
Expand All @@ -167,7 +166,7 @@ func (r *randRes) advance() {
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(random()) / float64(cap(r.store)))
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
Expand All @@ -178,7 +177,7 @@ func (r *randRes) advance() {
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}

func (r *randRes) Collect(dest *[]Exemplar) {
Expand Down
22 changes: 5 additions & 17 deletions sdk/metric/internal/exemplar/rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package exemplar
import (
"context"
"math"
"math/rand"
"slices"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -27,10 +28,12 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
intensity := 0.1
sampleSize := 1000

rng := rand.New(rand.NewSource(time.Now().UnixNano()))

data := make([]float64, sampleSize*1000)
for i := range data {
// Generate exponentially distributed data.
data[i] = (-1.0 / intensity) * math.Log(random())
data[i] = (-1.0 / intensity) * math.Log(rng.Float64())
}
// Sort to test position bias.
slices.Sort(data)
Expand All @@ -50,18 +53,3 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
// ensuring no bias in our random sampling algorithm.
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
}

func TestRandomConcurrentSafe(t *testing.T) {
const goRoutines = 10

var wg sync.WaitGroup
for n := 0; n < goRoutines; n++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = random()
}()
}

wg.Wait()
}

0 comments on commit 42fd8fe

Please sign in to comment.