From c83b0eae9e58b4f70b0e663fbb7dfa7b1fdfdb39 Mon Sep 17 00:00:00 2001 From: Hanno Hecker Date: Wed, 8 Feb 2017 17:25:25 +0100 Subject: [PATCH 1/5] add Stop() to Meter --- meter.go | 34 +++++++++++++++++++++++++++++++--- meter_test.go | 4 +++- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/meter.go b/meter.go index 0389ab0..d308526 100644 --- a/meter.go +++ b/meter.go @@ -15,6 +15,7 @@ type Meter interface { Rate15() float64 RateMean() float64 Snapshot() Meter + Stop() } // GetOrRegisterMeter returns an existing Meter or constructs and registers a @@ -34,7 +35,8 @@ func NewMeter() Meter { m := newStandardMeter() arbiter.Lock() defer arbiter.Unlock() - arbiter.meters = append(arbiter.meters, m) + m.id = arbiter.newID() + arbiter.meters[m.id] = m if !arbiter.started { arbiter.started = true go arbiter.tick() @@ -86,6 +88,9 @@ func (m *MeterSnapshot) RateMean() float64 { return m.rateMean } // Snapshot returns the snapshot. func (m *MeterSnapshot) Snapshot() Meter { return m } +// Stop is a no-op. +func (m *MeterSnapshot) Stop() {} + // NilMeter is a no-op Meter. type NilMeter struct{} @@ -110,12 +115,17 @@ func (NilMeter) RateMean() float64 { return 0.0 } // Snapshot is a no-op. func (NilMeter) Snapshot() Meter { return NilMeter{} } +// Stop is a no-op. +func (NilMeter) Stop() {} + // StandardMeter is the standard implementation of a Meter. type StandardMeter struct { lock sync.RWMutex snapshot *MeterSnapshot a1, a5, a15 EWMA startTime time.Time + stopped bool + id int64 } func newStandardMeter() *StandardMeter { @@ -128,6 +138,14 @@ func newStandardMeter() *StandardMeter { } } +// Stop stops the meter, Mark() will panic if you use it after being stopped. +func (m *StandardMeter) Stop() { + arbiter.Lock() + defer arbiter.Unlock() + m.stopped = true + delete(arbiter.meters, m.id) +} + // Count returns the number of events recorded. func (m *StandardMeter) Count() int64 { m.lock.RLock() @@ -140,6 +158,9 @@ func (m *StandardMeter) Count() int64 { func (m *StandardMeter) Mark(n int64) { m.lock.Lock() defer m.lock.Unlock() + if m.stopped { + panic("Mark called on a stopped Meter") + } m.snapshot.count += n m.a1.Update(n) m.a5.Update(n) @@ -208,11 +229,12 @@ func (m *StandardMeter) tick() { type meterArbiter struct { sync.RWMutex started bool - meters []*StandardMeter + meters map[int64]*StandardMeter ticker *time.Ticker + id int64 } -var arbiter = meterArbiter{ticker: time.NewTicker(5e9)} +var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[int64]*StandardMeter)} // Ticks meters on the scheduled interval func (ma *meterArbiter) tick() { @@ -224,6 +246,12 @@ func (ma *meterArbiter) tick() { } } +// should only be called with Lock() held +func (ma *meterArbiter) newID() int64 { + ma.id++ + return ma.id +} + func (ma *meterArbiter) tickMeters() { ma.RLock() defer ma.RUnlock() diff --git a/meter_test.go b/meter_test.go index 1727612..93e8ad8 100644 --- a/meter_test.go +++ b/meter_test.go @@ -24,9 +24,11 @@ func TestGetOrRegisterMeter(t *testing.T) { func TestMeterDecay(t *testing.T) { ma := meterArbiter{ ticker: time.NewTicker(time.Millisecond), + meters: make(map[int64]*StandardMeter), } m := newStandardMeter() - ma.meters = append(ma.meters, m) + m.id = ma.newID() + ma.meters[m.id] = m go ma.tick() m.Mark(1) rateMean := m.RateMean() From 4b2ddcb0bf13dce7963e46b97826d2695ed2f614 Mon Sep 17 00:00:00 2001 From: inooka_shiroyuki Date: Fri, 14 Apr 2017 12:04:09 +0900 Subject: [PATCH 2/5] add Stop() to Timer --- meter_test.go | 12 ++++++++++++ registry.go | 15 +++++++++++++++ registry_test.go | 17 +++++++++++++++++ timer.go | 12 ++++++++++++ timer_test.go | 12 ++++++++++++ 5 files changed, 68 insertions(+) diff --git a/meter_test.go b/meter_test.go index 93e8ad8..a9f66d7 100644 --- a/meter_test.go +++ b/meter_test.go @@ -46,6 +46,18 @@ func TestMeterNonzero(t *testing.T) { } } +func TestMeterStop(t *testing.T) { + l := len(arbiter.meters) + m := NewMeter() + if len(arbiter.meters) != l+1 { + t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters)) + } + m.Stop() + if len(arbiter.meters) != l { + t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters)) + } +} + func TestMeterSnapshot(t *testing.T) { m := NewMeter() m.Mark(1) diff --git a/registry.go b/registry.go index 2bb7a1e..f96fbd1 100644 --- a/registry.go +++ b/registry.go @@ -113,6 +113,7 @@ func (r *StandardRegistry) RunHealthchecks() { func (r *StandardRegistry) Unregister(name string) { r.mutex.Lock() defer r.mutex.Unlock() + r.stop(name) delete(r.metrics, name) } @@ -121,6 +122,7 @@ func (r *StandardRegistry) UnregisterAll() { r.mutex.Lock() defer r.mutex.Unlock() for name, _ := range r.metrics { + r.stop(name) delete(r.metrics, name) } } @@ -146,6 +148,19 @@ func (r *StandardRegistry) registered() map[string]interface{} { return metrics } +func (r *StandardRegistry) stop(name string) { + if i, ok := r.metrics[name]; ok { + if s, ok := i.(Stoppable); ok { + s.Stop() + } + } +} + +// Stoppable defines the metrics which has to be stopped. +type Stoppable interface { + Stop() +} + type PrefixedRegistry struct { underlying Registry prefix string diff --git a/registry_test.go b/registry_test.go index b42d4ee..a63e485 100644 --- a/registry_test.go +++ b/registry_test.go @@ -119,6 +119,23 @@ func TestRegistryGetOrRegisterWithLazyInstantiation(t *testing.T) { } } +func TestRegistryUnregister(t *testing.T) { + l := len(arbiter.meters) + r := NewRegistry() + r.Register("foo", NewCounter()) + r.Register("bar", NewMeter()) + r.Register("baz", NewTimer()) + if len(arbiter.meters) != l+2 { + t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters)) + } + r.Unregister("foo") + r.Unregister("bar") + r.Unregister("baz") + if len(arbiter.meters) != l { + t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters)) + } +} + func TestPrefixedChildRegistryGetOrRegister(t *testing.T) { r := NewRegistry() pr := NewPrefixedChildRegistry(r, "prefix.") diff --git a/timer.go b/timer.go index 17db8f8..d817e78 100644 --- a/timer.go +++ b/timer.go @@ -19,6 +19,7 @@ type Timer interface { RateMean() float64 Snapshot() Timer StdDev() float64 + Stop() Sum() int64 Time(func()) Update(time.Duration) @@ -112,6 +113,9 @@ func (NilTimer) Snapshot() Timer { return NilTimer{} } // StdDev is a no-op. func (NilTimer) StdDev() float64 { return 0.0 } +// Stop is a no-op. +func (NilTimer) Stop() {} + // Sum is a no-op. func (NilTimer) Sum() int64 { return 0 } @@ -201,6 +205,11 @@ func (t *StandardTimer) StdDev() float64 { return t.histogram.StdDev() } +// Stop stops the meter. +func (t *StandardTimer) Stop() { + t.meter.Stop() +} + // Sum returns the sum in the sample. func (t *StandardTimer) Sum() int64 { return t.histogram.Sum() @@ -288,6 +297,9 @@ func (t *TimerSnapshot) Snapshot() Timer { return t } // was taken. func (t *TimerSnapshot) StdDev() float64 { return t.histogram.StdDev() } +// Stop is a no-op. +func (t *TimerSnapshot) Stop() {} + // Sum returns the sum at the time the snapshot was taken. func (t *TimerSnapshot) Sum() int64 { return t.histogram.Sum() } diff --git a/timer_test.go b/timer_test.go index 313d691..f85c9b8 100644 --- a/timer_test.go +++ b/timer_test.go @@ -32,6 +32,18 @@ func TestTimerExtremes(t *testing.T) { } } +func TestTimerStop(t *testing.T) { + l := len(arbiter.meters) + tm := NewTimer() + if len(arbiter.meters) != l+1 { + t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters)) + } + tm.Stop() + if len(arbiter.meters) != l { + t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters)) + } +} + func TestTimerFunc(t *testing.T) { tm := NewTimer() tm.Time(func() { time.Sleep(50e6) }) From ff4b9a81db61c463c0cac067489b4ca763b826ef Mon Sep 17 00:00:00 2001 From: inooka_shiroyuki Date: Fri, 12 May 2017 14:02:40 +0900 Subject: [PATCH 3/5] use lock for meter stopped flag --- meter.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/meter.go b/meter.go index d308526..882029e 100644 --- a/meter.go +++ b/meter.go @@ -140,10 +140,15 @@ func newStandardMeter() *StandardMeter { // Stop stops the meter, Mark() will panic if you use it after being stopped. func (m *StandardMeter) Stop() { - arbiter.Lock() - defer arbiter.Unlock() + m.lock.Lock() + stopped := m.stopped m.stopped = true - delete(arbiter.meters, m.id) + m.lock.Unlock() + if !stopped { + arbiter.Lock() + delete(arbiter.meters, m.id) + arbiter.Unlock() + } } // Count returns the number of events recorded. @@ -159,7 +164,7 @@ func (m *StandardMeter) Mark(n int64) { m.lock.Lock() defer m.lock.Unlock() if m.stopped { - panic("Mark called on a stopped Meter") + return } m.snapshot.count += n m.a1.Update(n) From 9180854cf5a015b52b88b8c4d3aebeef411fbe78 Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Sat, 2 Sep 2017 15:25:18 -0700 Subject: [PATCH 4/5] Use set of meters and update documentation - use map[*StandardMeter]struct{} instead of map[int64]*StandardMeter to avoid id management pitfalls - update documentation and examples --- README.md | 12 +++++++++++- meter.go | 28 +++++++++++++--------------- meter_test.go | 5 ++--- timer.go | 6 ++++++ 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 2d1a6dc..0405fbd 100644 --- a/README.md +++ b/README.md @@ -42,12 +42,22 @@ t.Update(47) Register() is not threadsafe. For threadsafe metric registration use GetOrRegister: -``` +```go t := metrics.GetOrRegisterTimer("account.create.latency", nil) t.Time(func() {}) t.Update(47) ``` +**NOTE:** Be sure to either unregister meters and timers otherwise and they will +leak memory: + +```go +// Will call Stop() on the Meter to allow for garbage collection +metrics.Unregister("quux") +// Or similarly for a Timer that embeds a Meter +metrics.Unregister("bang") +``` + Periodically log every metric in human-readable form to standard error: ```go diff --git a/meter.go b/meter.go index 882029e..53ff329 100644 --- a/meter.go +++ b/meter.go @@ -20,6 +20,8 @@ type Meter interface { // GetOrRegisterMeter returns an existing Meter or constructs and registers a // new StandardMeter. +// Be sure to unregister the meter from the registry once it is of no use to +// allow for garbage collection. func GetOrRegisterMeter(name string, r Registry) Meter { if nil == r { r = DefaultRegistry @@ -28,6 +30,7 @@ func GetOrRegisterMeter(name string, r Registry) Meter { } // NewMeter constructs a new StandardMeter and launches a goroutine. +// Be sure to call Stop() once the meter is of no use to allow for garbage collection. func NewMeter() Meter { if UseNilMetrics { return NilMeter{} @@ -35,8 +38,7 @@ func NewMeter() Meter { m := newStandardMeter() arbiter.Lock() defer arbiter.Unlock() - m.id = arbiter.newID() - arbiter.meters[m.id] = m + arbiter.meters[m] = struct{}{} if !arbiter.started { arbiter.started = true go arbiter.tick() @@ -46,6 +48,8 @@ func NewMeter() Meter { // NewMeter constructs and registers a new StandardMeter and launches a // goroutine. +// Be sure to unregister the meter from the registry once it is of no use to +// allow for garbage collection. func NewRegisteredMeter(name string, r Registry) Meter { c := NewMeter() if nil == r { @@ -125,7 +129,6 @@ type StandardMeter struct { a1, a5, a15 EWMA startTime time.Time stopped bool - id int64 } func newStandardMeter() *StandardMeter { @@ -138,7 +141,7 @@ func newStandardMeter() *StandardMeter { } } -// Stop stops the meter, Mark() will panic if you use it after being stopped. +// Stop stops the meter, Mark() will be a no-op if you use it after being stopped. func (m *StandardMeter) Stop() { m.lock.Lock() stopped := m.stopped @@ -146,7 +149,7 @@ func (m *StandardMeter) Stop() { m.lock.Unlock() if !stopped { arbiter.Lock() - delete(arbiter.meters, m.id) + delete(arbiter.meters, m) arbiter.Unlock() } } @@ -231,15 +234,16 @@ func (m *StandardMeter) tick() { m.updateSnapshot() } +// meterArbiter ticks meters every 5s from a single goroutine. +// meters are references in a set for future stopping. type meterArbiter struct { sync.RWMutex started bool - meters map[int64]*StandardMeter + meters map[*StandardMeter]struct{} ticker *time.Ticker - id int64 } -var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[int64]*StandardMeter)} +var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})} // Ticks meters on the scheduled interval func (ma *meterArbiter) tick() { @@ -251,16 +255,10 @@ func (ma *meterArbiter) tick() { } } -// should only be called with Lock() held -func (ma *meterArbiter) newID() int64 { - ma.id++ - return ma.id -} - func (ma *meterArbiter) tickMeters() { ma.RLock() defer ma.RUnlock() - for _, meter := range ma.meters { + for meter := range ma.meters { meter.tick() } } diff --git a/meter_test.go b/meter_test.go index a9f66d7..e889222 100644 --- a/meter_test.go +++ b/meter_test.go @@ -24,11 +24,10 @@ func TestGetOrRegisterMeter(t *testing.T) { func TestMeterDecay(t *testing.T) { ma := meterArbiter{ ticker: time.NewTicker(time.Millisecond), - meters: make(map[int64]*StandardMeter), + meters: make(map[*StandardMeter]struct{}), } m := newStandardMeter() - m.id = ma.newID() - ma.meters[m.id] = m + ma.meters[m] = struct{}{} go ma.tick() m.Mark(1) rateMean := m.RateMean() diff --git a/timer.go b/timer.go index d817e78..d6ec4c6 100644 --- a/timer.go +++ b/timer.go @@ -29,6 +29,8 @@ type Timer interface { // GetOrRegisterTimer returns an existing Timer or constructs and registers a // new StandardTimer. +// Be sure to unregister the meter from the registry once it is of no use to +// allow for garbage collection. func GetOrRegisterTimer(name string, r Registry) Timer { if nil == r { r = DefaultRegistry @@ -37,6 +39,7 @@ func GetOrRegisterTimer(name string, r Registry) Timer { } // NewCustomTimer constructs a new StandardTimer from a Histogram and a Meter. +// Be sure to call Stop() once the timer is of no use to allow for garbage collection. func NewCustomTimer(h Histogram, m Meter) Timer { if UseNilMetrics { return NilTimer{} @@ -48,6 +51,8 @@ func NewCustomTimer(h Histogram, m Meter) Timer { } // NewRegisteredTimer constructs and registers a new StandardTimer. +// Be sure to unregister the meter from the registry once it is of no use to +// allow for garbage collection. func NewRegisteredTimer(name string, r Registry) Timer { c := NewTimer() if nil == r { @@ -59,6 +64,7 @@ func NewRegisteredTimer(name string, r Registry) Timer { // NewTimer constructs a new StandardTimer using an exponentially-decaying // sample with the same reservoir size and alpha as UNIX load averages. +// Be sure to call Stop() once the timer is of no use to allow for garbage collection. func NewTimer() Timer { if UseNilMetrics { return NilTimer{} From 362f9b590cae71aa1199055e02d3347d47ae2c87 Mon Sep 17 00:00:00 2001 From: Mikhail P Date: Mon, 27 Nov 2017 19:30:44 -0800 Subject: [PATCH 5/5] more precision in Unregister comment --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0405fbd..788146e 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ t.Time(func() {}) t.Update(47) ``` -**NOTE:** Be sure to either unregister meters and timers otherwise and they will +**NOTE:** Be sure to unregister short-lived meters and timers otherwise they will leak memory: ```go