diff --git a/metrics/ewma.go b/metrics/ewma.go index ed95cba19b4f..ffe4fe83a54d 100644 --- a/metrics/ewma.go +++ b/metrics/ewma.go @@ -77,16 +77,14 @@ func (NilEWMA) Update(n int64) {} type StandardEWMA struct { uncounted atomic.Int64 alpha float64 - rate float64 - init bool + rate atomic.Uint64 + init atomic.Bool mutex sync.Mutex } // Rate returns the moving average rate of events per second. func (a *StandardEWMA) Rate() float64 { - a.mutex.Lock() - defer a.mutex.Unlock() - return a.rate * float64(time.Second) + return math.Float64frombits(a.rate.Load()) * float64(time.Second) } // Snapshot returns a read-only copy of the EWMA. @@ -97,17 +95,36 @@ func (a *StandardEWMA) Snapshot() EWMA { // Tick ticks the clock to update the moving average. It assumes it is called // every five seconds. func (a *StandardEWMA) Tick() { - count := a.uncounted.Load() - a.uncounted.Add(-count) - instantRate := float64(count) / float64(5*time.Second) + // Optimization to avoid mutex locking in the hot-path. + if a.init.Load() { + a.updateRate(a.fetchInstantRate()) + return + } + // Slow-path: this is only needed on the first Tick() and preserves transactional updating + // of init and rate in the else block. The first conditional is needed below because + // a different thread could have set a.init = 1 between the time of the first atomic load and when + // the lock was acquired. a.mutex.Lock() - defer a.mutex.Unlock() - if a.init { - a.rate += a.alpha * (instantRate - a.rate) + if a.init.Load() { + // The fetchInstantRate() uses atomic loading, which is unnecessary in this critical section + // but again, this section is only invoked on the first successful Tick() operation. + a.updateRate(a.fetchInstantRate()) } else { - a.init = true - a.rate = instantRate + a.init.Store(true) + a.rate.Store(math.Float64bits(a.fetchInstantRate())) } + a.mutex.Unlock() +} + +func (a *StandardEWMA) fetchInstantRate() float64 { + count := a.uncounted.Swap(0) + return float64(count) / float64(5*time.Second) +} + +func (a *StandardEWMA) updateRate(instantRate float64) { + currentRate := math.Float64frombits(a.rate.Load()) + currentRate += a.alpha * (instantRate - currentRate) + a.rate.Store(math.Float64bits(currentRate)) } // Update adds n uncounted events. diff --git a/metrics/ewma_test.go b/metrics/ewma_test.go index 5b244191616e..73e77f852f76 100644 --- a/metrics/ewma_test.go +++ b/metrics/ewma_test.go @@ -14,6 +14,18 @@ func BenchmarkEWMA(b *testing.B) { } } +func BenchmarkEWMAParallel(b *testing.B) { + a := NewEWMA1() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + a.Update(1) + a.Tick() + } + }) +} + func TestEWMA1(t *testing.T) { a := NewEWMA1() a.Update(3) diff --git a/metrics/meter.go b/metrics/meter.go index 8a89dc4275f1..25e08d3629ce 100644 --- a/metrics/meter.go +++ b/metrics/meter.go @@ -1,6 +1,7 @@ package metrics import ( + "math" "sync" "sync/atomic" "time" @@ -30,17 +31,6 @@ func GetOrRegisterMeter(name string, r Registry) Meter { return r.GetOrRegister(name, NewMeter).(Meter) } -// GetOrRegisterMeterForced returns an existing Meter or constructs and registers a -// new StandardMeter no matter the global switch is enabled or not. -// Be sure to unregister the meter from the registry once it is of no use to -// allow for garbage collection. -func GetOrRegisterMeterForced(name string, r Registry) Meter { - if nil == r { - r = DefaultRegistry - } - return r.GetOrRegister(name, NewMeterForced).(Meter) -} - // NewMeter constructs a new StandardMeter and launches a goroutine. // Be sure to call Stop() once the meter is of no use to allow for garbage collection. func NewMeter() Meter { @@ -68,83 +58,49 @@ func NewInactiveMeter() Meter { return m } -// NewMeterForced constructs a new StandardMeter and launches a goroutine no matter -// the global switch is enabled or not. -// Be sure to call Stop() once the meter is of no use to allow for garbage collection. -func NewMeterForced() Meter { - m := newStandardMeter() - arbiter.Lock() - defer arbiter.Unlock() - arbiter.meters[m] = struct{}{} - if !arbiter.started { - arbiter.started = true - go arbiter.tick() - } - return m -} - // NewRegisteredMeter constructs and registers a new StandardMeter // and launches a goroutine. // Be sure to unregister the meter from the registry once it is of no use to // allow for garbage collection. func NewRegisteredMeter(name string, r Registry) Meter { - c := NewMeter() - if nil == r { - r = DefaultRegistry - } - r.Register(name, c) - return c -} - -// NewRegisteredMeterForced constructs and registers a new StandardMeter -// and launches a goroutine no matter the global switch is enabled or not. -// Be sure to unregister the meter from the registry once it is of no use to -// allow for garbage collection. -func NewRegisteredMeterForced(name string, r Registry) Meter { - c := NewMeterForced() - if nil == r { - r = DefaultRegistry - } - r.Register(name, c) - return c + return GetOrRegisterMeter(name, r) } -// MeterSnapshot is a read-only copy of another Meter. -type MeterSnapshot struct { - temp atomic.Int64 +// meterSnapshot is a read-only copy of the meter's internal values. +type meterSnapshot struct { count int64 rate1, rate5, rate15, rateMean float64 } // Count returns the count of events at the time the snapshot was taken. -func (m *MeterSnapshot) Count() int64 { return m.count } +func (m *meterSnapshot) Count() int64 { return m.count } // Mark panics. -func (*MeterSnapshot) Mark(n int64) { - panic("Mark called on a MeterSnapshot") +func (*meterSnapshot) Mark(n int64) { + panic("Mark called on a meterSnapshot") } // Rate1 returns the one-minute moving average rate of events per second at the // time the snapshot was taken. -func (m *MeterSnapshot) Rate1() float64 { return m.rate1 } +func (m *meterSnapshot) Rate1() float64 { return m.rate1 } // Rate5 returns the five-minute moving average rate of events per second at // the time the snapshot was taken. -func (m *MeterSnapshot) Rate5() float64 { return m.rate5 } +func (m *meterSnapshot) Rate5() float64 { return m.rate5 } // Rate15 returns the fifteen-minute moving average rate of events per second // at the time the snapshot was taken. -func (m *MeterSnapshot) Rate15() float64 { return m.rate15 } +func (m *meterSnapshot) Rate15() float64 { return m.rate15 } // RateMean returns the meter's mean rate of events per second at the time the // snapshot was taken. -func (m *MeterSnapshot) RateMean() float64 { return m.rateMean } +func (m *meterSnapshot) RateMean() float64 { return m.rateMean } // Snapshot returns the snapshot. -func (m *MeterSnapshot) Snapshot() Meter { return m } +func (m *meterSnapshot) Snapshot() Meter { return m } // Stop is a no-op. -func (m *MeterSnapshot) Stop() {} +func (m *meterSnapshot) Stop() {} // NilMeter is a no-op Meter. type NilMeter struct{} @@ -175,8 +131,10 @@ func (NilMeter) Stop() {} // StandardMeter is the standard implementation of a Meter. type StandardMeter struct { - lock sync.RWMutex - snapshot *MeterSnapshot + count atomic.Int64 + temp atomic.Int64 + rateMean atomic.Uint64 + a1, a5, a15 EWMA startTime time.Time stopped atomic.Bool @@ -184,7 +142,6 @@ type StandardMeter struct { func newStandardMeter() *StandardMeter { return &StandardMeter{ - snapshot: &MeterSnapshot{}, a1: NewEWMA1(), a5: NewEWMA5(), a15: NewEWMA15(), @@ -194,8 +151,7 @@ func newStandardMeter() *StandardMeter { // Stop stops the meter, Mark() will be a no-op if you use it after being stopped. func (m *StandardMeter) Stop() { - stopped := m.stopped.Swap(true) - if !stopped { + if stopped := m.stopped.Swap(true); !stopped { arbiter.Lock() delete(arbiter.meters, m) arbiter.Unlock() @@ -203,88 +159,59 @@ func (m *StandardMeter) Stop() { } // Count returns the number of events recorded. -// It updates the meter to be as accurate as possible func (m *StandardMeter) Count() int64 { - m.lock.Lock() - defer m.lock.Unlock() - m.updateMeter() - return m.snapshot.count + return m.count.Load() + m.temp.Load() } // Mark records the occurrence of n events. func (m *StandardMeter) Mark(n int64) { - m.snapshot.temp.Add(n) + m.temp.Add(n) } // Rate1 returns the one-minute moving average rate of events per second. func (m *StandardMeter) Rate1() float64 { - m.lock.RLock() - defer m.lock.RUnlock() - return m.snapshot.rate1 + return m.a1.Rate() } // Rate5 returns the five-minute moving average rate of events per second. func (m *StandardMeter) Rate5() float64 { - m.lock.RLock() - defer m.lock.RUnlock() - return m.snapshot.rate5 + return m.a5.Rate() } // Rate15 returns the fifteen-minute moving average rate of events per second. func (m *StandardMeter) Rate15() float64 { - m.lock.RLock() - defer m.lock.RUnlock() - return m.snapshot.rate15 + return m.a15.Rate() } // RateMean returns the meter's mean rate of events per second. func (m *StandardMeter) RateMean() float64 { - m.lock.RLock() - defer m.lock.RUnlock() - return m.snapshot.rateMean + return math.Float64frombits(m.rateMean.Load()) } // Snapshot returns a read-only copy of the meter. func (m *StandardMeter) Snapshot() Meter { - m.lock.RLock() - snapshot := MeterSnapshot{ - count: m.snapshot.count, - rate1: m.snapshot.rate1, - rate5: m.snapshot.rate5, - rate15: m.snapshot.rate15, - rateMean: m.snapshot.rateMean, + return &meterSnapshot{ + count: m.count.Load(), + rate1: m.Rate1(), + rate5: m.Rate5(), + rate15: m.Rate15(), + rateMean: m.RateMean(), } - snapshot.temp.Store(m.snapshot.temp.Load()) - m.lock.RUnlock() - return &snapshot -} - -func (m *StandardMeter) updateSnapshot() { - // should run with write lock held on m.lock - snapshot := m.snapshot - snapshot.rate1 = m.a1.Rate() - snapshot.rate5 = m.a5.Rate() - snapshot.rate15 = m.a15.Rate() - snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds() } -func (m *StandardMeter) updateMeter() { - // should only run with write lock held on m.lock - n := m.snapshot.temp.Swap(0) - m.snapshot.count += n +func (m *StandardMeter) tick() { + // Take the uncounted values, add to count + n := m.temp.Swap(0) + count := m.count.Add(n) + m.rateMean.Store(math.Float64bits(float64(count) / time.Since(m.startTime).Seconds())) + // Update the EWMA's internal state m.a1.Update(n) m.a5.Update(n) m.a15.Update(n) -} - -func (m *StandardMeter) tick() { - m.lock.Lock() - defer m.lock.Unlock() - m.updateMeter() + // And trigger them to calculate the rates m.a1.Tick() m.a5.Tick() m.a15.Tick() - m.updateSnapshot() } // meterArbiter ticks meters every 5s from a single goroutine. diff --git a/metrics/timer.go b/metrics/timer.go index 2e1a9be47295..ac88e24ad1d9 100644 --- a/metrics/timer.go +++ b/metrics/timer.go @@ -199,7 +199,7 @@ func (t *StandardTimer) Snapshot() Timer { defer t.mutex.Unlock() return &TimerSnapshot{ histogram: t.histogram.Snapshot().(*HistogramSnapshot), - meter: t.meter.Snapshot().(*MeterSnapshot), + meter: t.meter.Snapshot().(*meterSnapshot), } } @@ -249,7 +249,7 @@ func (t *StandardTimer) Variance() float64 { // TimerSnapshot is a read-only copy of another Timer. type TimerSnapshot struct { histogram *HistogramSnapshot - meter *MeterSnapshot + meter *meterSnapshot } // Count returns the number of events recorded at the time the snapshot was