Skip to content

Commit

Permalink
Merge pull request #206 from inooka-shiroyuki/stop-timer
Browse files Browse the repository at this point in the history
Stop timer/meter
  • Loading branch information
mihasya authored Nov 28, 2017
2 parents af27be0 + 362f9b5 commit 39aa482
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 6 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,22 @@ t.Update(47)
Register() is not threadsafe. For threadsafe metric registration use
GetOrRegister:

```
```go
t := metrics.GetOrRegisterTimer("account.create.latency", nil)
t.Time(func() {})
t.Update(47)
```

**NOTE:** Be sure to unregister short-lived meters and timers otherwise they will
leak memory:

```go
// Will call Stop() on the Meter to allow for garbage collection
metrics.Unregister("quux")
// Or similarly for a Timer that embeds a Meter
metrics.Unregister("bang")
```

Periodically log every metric in human-readable form to standard error:

```go
Expand Down
39 changes: 35 additions & 4 deletions meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ type Meter interface {
Rate15() float64
RateMean() float64
Snapshot() Meter
Stop()
}

// GetOrRegisterMeter returns an existing Meter or constructs and registers a
// new StandardMeter.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterMeter(name string, r Registry) Meter {
if nil == r {
r = DefaultRegistry
Expand All @@ -27,14 +30,15 @@ func GetOrRegisterMeter(name string, r Registry) Meter {
}

// NewMeter constructs a new StandardMeter and launches a goroutine.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeter() Meter {
if UseNilMetrics {
return NilMeter{}
}
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters = append(arbiter.meters, m)
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
Expand All @@ -44,6 +48,8 @@ func NewMeter() Meter {

// NewMeter constructs and registers a new StandardMeter and launches a
// goroutine.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeter(name string, r Registry) Meter {
c := NewMeter()
if nil == r {
Expand Down Expand Up @@ -86,6 +92,9 @@ func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
// Snapshot returns the snapshot.
func (m *MeterSnapshot) Snapshot() Meter { return m }

// Stop is a no-op.
func (m *MeterSnapshot) Stop() {}

// NilMeter is a no-op Meter.
type NilMeter struct{}

Expand All @@ -110,12 +119,16 @@ func (NilMeter) RateMean() float64 { return 0.0 }
// Snapshot is a no-op.
func (NilMeter) Snapshot() Meter { return NilMeter{} }

// Stop is a no-op.
func (NilMeter) Stop() {}

// StandardMeter is the standard implementation of a Meter.
type StandardMeter struct {
lock sync.RWMutex
snapshot *MeterSnapshot
a1, a5, a15 EWMA
startTime time.Time
stopped bool
}

func newStandardMeter() *StandardMeter {
Expand All @@ -128,6 +141,19 @@ func newStandardMeter() *StandardMeter {
}
}

// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
m.lock.Lock()
stopped := m.stopped
m.stopped = true
m.lock.Unlock()
if !stopped {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
}
}

// Count returns the number of events recorded.
func (m *StandardMeter) Count() int64 {
m.lock.RLock()
Expand All @@ -140,6 +166,9 @@ func (m *StandardMeter) Count() int64 {
func (m *StandardMeter) Mark(n int64) {
m.lock.Lock()
defer m.lock.Unlock()
if m.stopped {
return
}
m.snapshot.count += n
m.a1.Update(n)
m.a5.Update(n)
Expand Down Expand Up @@ -205,14 +234,16 @@ func (m *StandardMeter) tick() {
m.updateSnapshot()
}

// meterArbiter ticks meters every 5s from a single goroutine.
// meters are references in a set for future stopping.
type meterArbiter struct {
sync.RWMutex
started bool
meters []*StandardMeter
meters map[*StandardMeter]struct{}
ticker *time.Ticker
}

var arbiter = meterArbiter{ticker: time.NewTicker(5e9)}
var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}

// Ticks meters on the scheduled interval
func (ma *meterArbiter) tick() {
Expand All @@ -227,7 +258,7 @@ func (ma *meterArbiter) tick() {
func (ma *meterArbiter) tickMeters() {
ma.RLock()
defer ma.RUnlock()
for _, meter := range ma.meters {
for meter := range ma.meters {
meter.tick()
}
}
15 changes: 14 additions & 1 deletion meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func TestGetOrRegisterMeter(t *testing.T) {
func TestMeterDecay(t *testing.T) {
ma := meterArbiter{
ticker: time.NewTicker(time.Millisecond),
meters: make(map[*StandardMeter]struct{}),
}
m := newStandardMeter()
ma.meters = append(ma.meters, m)
ma.meters[m] = struct{}{}
go ma.tick()
m.Mark(1)
rateMean := m.RateMean()
Expand All @@ -44,6 +45,18 @@ func TestMeterNonzero(t *testing.T) {
}
}

func TestMeterStop(t *testing.T) {
l := len(arbiter.meters)
m := NewMeter()
if len(arbiter.meters) != l+1 {
t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters))
}
m.Stop()
if len(arbiter.meters) != l {
t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters))
}
}

func TestMeterSnapshot(t *testing.T) {
m := NewMeter()
m.Mark(1)
Expand Down
15 changes: 15 additions & 0 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (r *StandardRegistry) RunHealthchecks() {
func (r *StandardRegistry) Unregister(name string) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.stop(name)
delete(r.metrics, name)
}

Expand All @@ -121,6 +122,7 @@ func (r *StandardRegistry) UnregisterAll() {
r.mutex.Lock()
defer r.mutex.Unlock()
for name, _ := range r.metrics {
r.stop(name)
delete(r.metrics, name)
}
}
Expand All @@ -146,6 +148,19 @@ func (r *StandardRegistry) registered() map[string]interface{} {
return metrics
}

func (r *StandardRegistry) stop(name string) {
if i, ok := r.metrics[name]; ok {
if s, ok := i.(Stoppable); ok {
s.Stop()
}
}
}

// Stoppable defines the metrics which has to be stopped.
type Stoppable interface {
Stop()
}

type PrefixedRegistry struct {
underlying Registry
prefix string
Expand Down
17 changes: 17 additions & 0 deletions registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ func TestRegistryGetOrRegisterWithLazyInstantiation(t *testing.T) {
}
}

func TestRegistryUnregister(t *testing.T) {
l := len(arbiter.meters)
r := NewRegistry()
r.Register("foo", NewCounter())
r.Register("bar", NewMeter())
r.Register("baz", NewTimer())
if len(arbiter.meters) != l+2 {
t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters))
}
r.Unregister("foo")
r.Unregister("bar")
r.Unregister("baz")
if len(arbiter.meters) != l {
t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters))
}
}

func TestPrefixedChildRegistryGetOrRegister(t *testing.T) {
r := NewRegistry()
pr := NewPrefixedChildRegistry(r, "prefix.")
Expand Down
18 changes: 18 additions & 0 deletions timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Timer interface {
RateMean() float64
Snapshot() Timer
StdDev() float64
Stop()
Sum() int64
Time(func())
Update(time.Duration)
Expand All @@ -28,6 +29,8 @@ type Timer interface {

// GetOrRegisterTimer returns an existing Timer or constructs and registers a
// new StandardTimer.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterTimer(name string, r Registry) Timer {
if nil == r {
r = DefaultRegistry
Expand All @@ -36,6 +39,7 @@ func GetOrRegisterTimer(name string, r Registry) Timer {
}

// NewCustomTimer constructs a new StandardTimer from a Histogram and a Meter.
// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
func NewCustomTimer(h Histogram, m Meter) Timer {
if UseNilMetrics {
return NilTimer{}
Expand All @@ -47,6 +51,8 @@ func NewCustomTimer(h Histogram, m Meter) Timer {
}

// NewRegisteredTimer constructs and registers a new StandardTimer.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredTimer(name string, r Registry) Timer {
c := NewTimer()
if nil == r {
Expand All @@ -58,6 +64,7 @@ func NewRegisteredTimer(name string, r Registry) Timer {

// NewTimer constructs a new StandardTimer using an exponentially-decaying
// sample with the same reservoir size and alpha as UNIX load averages.
// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
func NewTimer() Timer {
if UseNilMetrics {
return NilTimer{}
Expand Down Expand Up @@ -112,6 +119,9 @@ func (NilTimer) Snapshot() Timer { return NilTimer{} }
// StdDev is a no-op.
func (NilTimer) StdDev() float64 { return 0.0 }

// Stop is a no-op.
func (NilTimer) Stop() {}

// Sum is a no-op.
func (NilTimer) Sum() int64 { return 0 }

Expand Down Expand Up @@ -201,6 +211,11 @@ func (t *StandardTimer) StdDev() float64 {
return t.histogram.StdDev()
}

// Stop stops the meter.
func (t *StandardTimer) Stop() {
t.meter.Stop()
}

// Sum returns the sum in the sample.
func (t *StandardTimer) Sum() int64 {
return t.histogram.Sum()
Expand Down Expand Up @@ -288,6 +303,9 @@ func (t *TimerSnapshot) Snapshot() Timer { return t }
// was taken.
func (t *TimerSnapshot) StdDev() float64 { return t.histogram.StdDev() }

// Stop is a no-op.
func (t *TimerSnapshot) Stop() {}

// Sum returns the sum at the time the snapshot was taken.
func (t *TimerSnapshot) Sum() int64 { return t.histogram.Sum() }

Expand Down
12 changes: 12 additions & 0 deletions timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ func TestTimerExtremes(t *testing.T) {
}
}

func TestTimerStop(t *testing.T) {
l := len(arbiter.meters)
tm := NewTimer()
if len(arbiter.meters) != l+1 {
t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters))
}
tm.Stop()
if len(arbiter.meters) != l {
t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters))
}
}

func TestTimerFunc(t *testing.T) {
tm := NewTimer()
tm.Time(func() { time.Sleep(50e6) })
Expand Down

0 comments on commit 39aa482

Please sign in to comment.