Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: apply selected changes from upstream #28031

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions metrics/ewma.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,14 @@ func (NilEWMA) Update(n int64) {}
type StandardEWMA struct {
uncounted atomic.Int64
alpha float64
rate float64
init bool
rate atomic.Uint64
init atomic.Bool
mutex sync.Mutex
}

// Rate returns the moving average rate of events per second.
func (a *StandardEWMA) Rate() float64 {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.rate * float64(time.Second)
return math.Float64frombits(a.rate.Load()) * float64(time.Second)
}

// Snapshot returns a read-only copy of the EWMA.
Expand All @@ -97,17 +95,36 @@ func (a *StandardEWMA) Snapshot() EWMA {
// Tick ticks the clock to update the moving average. It assumes it is called
// every five seconds.
func (a *StandardEWMA) Tick() {
count := a.uncounted.Load()
a.uncounted.Add(-count)
instantRate := float64(count) / float64(5*time.Second)
// Optimization to avoid mutex locking in the hot-path.
if a.init.Load() {
a.updateRate(a.fetchInstantRate())
return
}
// Slow-path: this is only needed on the first Tick() and preserves transactional updating
// of init and rate in the else block. The first conditional is needed below because
// a different thread could have set a.init = 1 between the time of the first atomic load and when
// the lock was acquired.
a.mutex.Lock()
defer a.mutex.Unlock()
if a.init {
a.rate += a.alpha * (instantRate - a.rate)
if a.init.Load() {
// The fetchInstantRate() uses atomic loading, which is unnecessary in this critical section
// but again, this section is only invoked on the first successful Tick() operation.
a.updateRate(a.fetchInstantRate())
} else {
a.init = true
a.rate = instantRate
a.init.Store(true)
a.rate.Store(math.Float64bits(a.fetchInstantRate()))
}
a.mutex.Unlock()
}

func (a *StandardEWMA) fetchInstantRate() float64 {
count := a.uncounted.Swap(0)
return float64(count) / float64(5*time.Second)
}

func (a *StandardEWMA) updateRate(instantRate float64) {
currentRate := math.Float64frombits(a.rate.Load())
currentRate += a.alpha * (instantRate - currentRate)
a.rate.Store(math.Float64bits(currentRate))
}

// Update adds n uncounted events.
Expand Down
12 changes: 12 additions & 0 deletions metrics/ewma_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ func BenchmarkEWMA(b *testing.B) {
}
}

func BenchmarkEWMAParallel(b *testing.B) {
a := NewEWMA1()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Update(1)
a.Tick()
}
})
}

func TestEWMA1(t *testing.T) {
a := NewEWMA1()
a.Update(3)
Expand Down
147 changes: 37 additions & 110 deletions metrics/meter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -30,17 +31,6 @@ func GetOrRegisterMeter(name string, r Registry) Meter {
return r.GetOrRegister(name, NewMeter).(Meter)
}

// GetOrRegisterMeterForced returns an existing Meter or constructs and registers a
// new StandardMeter no matter the global switch is enabled or not.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterMeterForced(name string, r Registry) Meter {
if nil == r {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewMeterForced).(Meter)
}

// NewMeter constructs a new StandardMeter and launches a goroutine.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeter() Meter {
Expand Down Expand Up @@ -68,83 +58,49 @@ func NewInactiveMeter() Meter {
return m
}

// NewMeterForced constructs a new StandardMeter and launches a goroutine no matter
// the global switch is enabled or not.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeterForced() Meter {
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
}
return m
}

// NewRegisteredMeter constructs and registers a new StandardMeter
// and launches a goroutine.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeter(name string, r Registry) Meter {
c := NewMeter()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
}

// NewRegisteredMeterForced constructs and registers a new StandardMeter
// and launches a goroutine no matter the global switch is enabled or not.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeterForced(name string, r Registry) Meter {
c := NewMeterForced()
if nil == r {
r = DefaultRegistry
}
r.Register(name, c)
return c
return GetOrRegisterMeter(name, r)
}

// MeterSnapshot is a read-only copy of another Meter.
type MeterSnapshot struct {
temp atomic.Int64
// meterSnapshot is a read-only copy of the meter's internal values.
type meterSnapshot struct {
count int64
rate1, rate5, rate15, rateMean float64
}

// Count returns the count of events at the time the snapshot was taken.
func (m *MeterSnapshot) Count() int64 { return m.count }
func (m *meterSnapshot) Count() int64 { return m.count }

// Mark panics.
func (*MeterSnapshot) Mark(n int64) {
panic("Mark called on a MeterSnapshot")
func (*meterSnapshot) Mark(n int64) {
panic("Mark called on a meterSnapshot")
}

// Rate1 returns the one-minute moving average rate of events per second at the
// time the snapshot was taken.
func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
func (m *meterSnapshot) Rate1() float64 { return m.rate1 }

// Rate5 returns the five-minute moving average rate of events per second at
// the time the snapshot was taken.
func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
func (m *meterSnapshot) Rate5() float64 { return m.rate5 }

// Rate15 returns the fifteen-minute moving average rate of events per second
// at the time the snapshot was taken.
func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
func (m *meterSnapshot) Rate15() float64 { return m.rate15 }

// RateMean returns the meter's mean rate of events per second at the time the
// snapshot was taken.
func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
func (m *meterSnapshot) RateMean() float64 { return m.rateMean }

// Snapshot returns the snapshot.
func (m *MeterSnapshot) Snapshot() Meter { return m }
func (m *meterSnapshot) Snapshot() Meter { return m }

// Stop is a no-op.
func (m *MeterSnapshot) Stop() {}
func (m *meterSnapshot) Stop() {}

// NilMeter is a no-op Meter.
type NilMeter struct{}
Expand Down Expand Up @@ -175,16 +131,17 @@ func (NilMeter) Stop() {}

// StandardMeter is the standard implementation of a Meter.
type StandardMeter struct {
lock sync.RWMutex
snapshot *MeterSnapshot
count atomic.Int64
temp atomic.Int64
rateMean atomic.Uint64

a1, a5, a15 EWMA
startTime time.Time
stopped atomic.Bool
}

func newStandardMeter() *StandardMeter {
return &StandardMeter{
snapshot: &MeterSnapshot{},
a1: NewEWMA1(),
a5: NewEWMA5(),
a15: NewEWMA15(),
Expand All @@ -194,97 +151,67 @@ func newStandardMeter() *StandardMeter {

// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
stopped := m.stopped.Swap(true)
if !stopped {
if stopped := m.stopped.Swap(true); !stopped {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
}
}

// Count returns the number of events recorded.
// It updates the meter to be as accurate as possible
func (m *StandardMeter) Count() int64 {
m.lock.Lock()
defer m.lock.Unlock()
m.updateMeter()
return m.snapshot.count
return m.count.Load() + m.temp.Load()
}

// Mark records the occurrence of n events.
func (m *StandardMeter) Mark(n int64) {
m.snapshot.temp.Add(n)
m.temp.Add(n)
}

// Rate1 returns the one-minute moving average rate of events per second.
func (m *StandardMeter) Rate1() float64 {
m.lock.RLock()
defer m.lock.RUnlock()
return m.snapshot.rate1
return m.a1.Rate()
}

// Rate5 returns the five-minute moving average rate of events per second.
func (m *StandardMeter) Rate5() float64 {
m.lock.RLock()
defer m.lock.RUnlock()
return m.snapshot.rate5
return m.a5.Rate()
}

// Rate15 returns the fifteen-minute moving average rate of events per second.
func (m *StandardMeter) Rate15() float64 {
m.lock.RLock()
defer m.lock.RUnlock()
return m.snapshot.rate15
return m.a15.Rate()
}

// RateMean returns the meter's mean rate of events per second.
func (m *StandardMeter) RateMean() float64 {
m.lock.RLock()
defer m.lock.RUnlock()
return m.snapshot.rateMean
return math.Float64frombits(m.rateMean.Load())
}

// Snapshot returns a read-only copy of the meter.
func (m *StandardMeter) Snapshot() Meter {
m.lock.RLock()
snapshot := MeterSnapshot{
count: m.snapshot.count,
rate1: m.snapshot.rate1,
rate5: m.snapshot.rate5,
rate15: m.snapshot.rate15,
rateMean: m.snapshot.rateMean,
return &meterSnapshot{
count: m.count.Load(),
rate1: m.Rate1(),
rate5: m.Rate5(),
rate15: m.Rate15(),
rateMean: m.RateMean(),
}
snapshot.temp.Store(m.snapshot.temp.Load())
m.lock.RUnlock()
return &snapshot
}

func (m *StandardMeter) updateSnapshot() {
// should run with write lock held on m.lock
snapshot := m.snapshot
snapshot.rate1 = m.a1.Rate()
snapshot.rate5 = m.a5.Rate()
snapshot.rate15 = m.a15.Rate()
snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
}

func (m *StandardMeter) updateMeter() {
// should only run with write lock held on m.lock
n := m.snapshot.temp.Swap(0)
m.snapshot.count += n
func (m *StandardMeter) tick() {
// Take the uncounted values, add to count
n := m.temp.Swap(0)
count := m.count.Add(n)
m.rateMean.Store(math.Float64bits(float64(count) / time.Since(m.startTime).Seconds()))
// Update the EWMA's internal state
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
}

func (m *StandardMeter) tick() {
m.lock.Lock()
defer m.lock.Unlock()
m.updateMeter()
// And trigger them to calculate the rates
m.a1.Tick()
m.a5.Tick()
m.a15.Tick()
m.updateSnapshot()
}

// meterArbiter ticks meters every 5s from a single goroutine.
Expand Down
4 changes: 2 additions & 2 deletions metrics/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (t *StandardTimer) Snapshot() Timer {
defer t.mutex.Unlock()
return &TimerSnapshot{
histogram: t.histogram.Snapshot().(*HistogramSnapshot),
meter: t.meter.Snapshot().(*MeterSnapshot),
meter: t.meter.Snapshot().(*meterSnapshot),
}
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (t *StandardTimer) Variance() float64 {
// TimerSnapshot is a read-only copy of another Timer.
type TimerSnapshot struct {
histogram *HistogramSnapshot
meter *MeterSnapshot
meter *meterSnapshot
}

// Count returns the number of events recorded at the time the snapshot was
Expand Down