Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop timer/meter #206

Merged
merged 6 commits into from
Nov 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,22 @@ t.Update(47)
Register() is not threadsafe. For threadsafe metric registration use
GetOrRegister:

```
```go
t := metrics.GetOrRegisterTimer("account.create.latency", nil)
t.Time(func() {})
t.Update(47)
```

**NOTE:** Be sure to unregister short-lived meters and timers otherwise they will
leak memory:

```go
// Will call Stop() on the Meter to allow for garbage collection
metrics.Unregister("quux")
// Or similarly for a Timer that embeds a Meter
metrics.Unregister("bang")
```

Periodically log every metric in human-readable form to standard error:

```go
Expand Down
39 changes: 35 additions & 4 deletions meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ type Meter interface {
Rate15() float64
RateMean() float64
Snapshot() Meter
Stop()
}

// GetOrRegisterMeter returns an existing Meter or constructs and registers a
// new StandardMeter.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterMeter(name string, r Registry) Meter {
if nil == r {
r = DefaultRegistry
Expand All @@ -27,14 +30,15 @@ func GetOrRegisterMeter(name string, r Registry) Meter {
}

// NewMeter constructs a new StandardMeter and launches a goroutine.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeter() Meter {
if UseNilMetrics {
return NilMeter{}
}
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters = append(arbiter.meters, m)
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
Expand All @@ -44,6 +48,8 @@ func NewMeter() Meter {

// NewMeter constructs and registers a new StandardMeter and launches a
// goroutine.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeter(name string, r Registry) Meter {
c := NewMeter()
if nil == r {
Expand Down Expand Up @@ -86,6 +92,9 @@ func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
// Snapshot returns the snapshot.
func (m *MeterSnapshot) Snapshot() Meter { return m }

// Stop is a no-op.
func (m *MeterSnapshot) Stop() {}

// NilMeter is a no-op Meter.
type NilMeter struct{}

Expand All @@ -110,12 +119,16 @@ func (NilMeter) RateMean() float64 { return 0.0 }
// Snapshot is a no-op.
func (NilMeter) Snapshot() Meter { return NilMeter{} }

// Stop is a no-op.
func (NilMeter) Stop() {}

// StandardMeter is the standard implementation of a Meter.
type StandardMeter struct {
lock sync.RWMutex
snapshot *MeterSnapshot
a1, a5, a15 EWMA
startTime time.Time
stopped bool
}

func newStandardMeter() *StandardMeter {
Expand All @@ -128,6 +141,19 @@ func newStandardMeter() *StandardMeter {
}
}

// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
m.lock.Lock()
stopped := m.stopped
m.stopped = true
m.lock.Unlock()
if !stopped {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
}
}

// Count returns the number of events recorded.
func (m *StandardMeter) Count() int64 {
m.lock.RLock()
Expand All @@ -140,6 +166,9 @@ func (m *StandardMeter) Count() int64 {
func (m *StandardMeter) Mark(n int64) {
m.lock.Lock()
defer m.lock.Unlock()
if m.stopped {
return
}
m.snapshot.count += n
m.a1.Update(n)
m.a5.Update(n)
Expand Down Expand Up @@ -205,14 +234,16 @@ func (m *StandardMeter) tick() {
m.updateSnapshot()
}

// meterArbiter ticks meters every 5s from a single goroutine.
// meters are references in a set for future stopping.
type meterArbiter struct {
sync.RWMutex
started bool
meters []*StandardMeter
meters map[*StandardMeter]struct{}
ticker *time.Ticker
}

var arbiter = meterArbiter{ticker: time.NewTicker(5e9)}
var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}

// Ticks meters on the scheduled interval
func (ma *meterArbiter) tick() {
Expand All @@ -227,7 +258,7 @@ func (ma *meterArbiter) tick() {
func (ma *meterArbiter) tickMeters() {
ma.RLock()
defer ma.RUnlock()
for _, meter := range ma.meters {
for meter := range ma.meters {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this syntax only work in newer Go? I would leave it as is to avoid breaking bw compatibility.

Edit: I see tests pass on all the old versions, disregard.

meter.tick()
}
}
15 changes: 14 additions & 1 deletion meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ func TestGetOrRegisterMeter(t *testing.T) {
func TestMeterDecay(t *testing.T) {
ma := meterArbiter{
ticker: time.NewTicker(time.Millisecond),
meters: make(map[*StandardMeter]struct{}),
}
m := newStandardMeter()
ma.meters = append(ma.meters, m)
ma.meters[m] = struct{}{}
go ma.tick()
m.Mark(1)
rateMean := m.RateMean()
Expand All @@ -44,6 +45,18 @@ func TestMeterNonzero(t *testing.T) {
}
}

func TestMeterStop(t *testing.T) {
l := len(arbiter.meters)
m := NewMeter()
if len(arbiter.meters) != l+1 {
t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters))
}
m.Stop()
if len(arbiter.meters) != l {
t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters))
}
}

func TestMeterSnapshot(t *testing.T) {
m := NewMeter()
m.Mark(1)
Expand Down
15 changes: 15 additions & 0 deletions registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (r *StandardRegistry) RunHealthchecks() {
func (r *StandardRegistry) Unregister(name string) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.stop(name)
delete(r.metrics, name)
}

Expand All @@ -121,6 +122,7 @@ func (r *StandardRegistry) UnregisterAll() {
r.mutex.Lock()
defer r.mutex.Unlock()
for name, _ := range r.metrics {
r.stop(name)
delete(r.metrics, name)
}
}
Expand All @@ -146,6 +148,19 @@ func (r *StandardRegistry) registered() map[string]interface{} {
return metrics
}

func (r *StandardRegistry) stop(name string) {
if i, ok := r.metrics[name]; ok {
if s, ok := i.(Stoppable); ok {
s.Stop()
}
}
}

// Stoppable defines the metrics which has to be stopped.
type Stoppable interface {
Stop()
}

type PrefixedRegistry struct {
underlying Registry
prefix string
Expand Down
17 changes: 17 additions & 0 deletions registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ func TestRegistryGetOrRegisterWithLazyInstantiation(t *testing.T) {
}
}

func TestRegistryUnregister(t *testing.T) {
l := len(arbiter.meters)
r := NewRegistry()
r.Register("foo", NewCounter())
r.Register("bar", NewMeter())
r.Register("baz", NewTimer())
if len(arbiter.meters) != l+2 {
t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters))
}
r.Unregister("foo")
r.Unregister("bar")
r.Unregister("baz")
if len(arbiter.meters) != l {
t.Errorf("arbiter.meters: %d != %d\n", l+2, len(arbiter.meters))
}
}

func TestPrefixedChildRegistryGetOrRegister(t *testing.T) {
r := NewRegistry()
pr := NewPrefixedChildRegistry(r, "prefix.")
Expand Down
18 changes: 18 additions & 0 deletions timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Timer interface {
RateMean() float64
Snapshot() Timer
StdDev() float64
Stop()
Sum() int64
Time(func())
Update(time.Duration)
Expand All @@ -28,6 +29,8 @@ type Timer interface {

// GetOrRegisterTimer returns an existing Timer or constructs and registers a
// new StandardTimer.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterTimer(name string, r Registry) Timer {
if nil == r {
r = DefaultRegistry
Expand All @@ -36,6 +39,7 @@ func GetOrRegisterTimer(name string, r Registry) Timer {
}

// NewCustomTimer constructs a new StandardTimer from a Histogram and a Meter.
// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
func NewCustomTimer(h Histogram, m Meter) Timer {
if UseNilMetrics {
return NilTimer{}
Expand All @@ -47,6 +51,8 @@ func NewCustomTimer(h Histogram, m Meter) Timer {
}

// NewRegisteredTimer constructs and registers a new StandardTimer.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredTimer(name string, r Registry) Timer {
c := NewTimer()
if nil == r {
Expand All @@ -58,6 +64,7 @@ func NewRegisteredTimer(name string, r Registry) Timer {

// NewTimer constructs a new StandardTimer using an exponentially-decaying
// sample with the same reservoir size and alpha as UNIX load averages.
// Be sure to call Stop() once the timer is of no use to allow for garbage collection.
func NewTimer() Timer {
if UseNilMetrics {
return NilTimer{}
Expand Down Expand Up @@ -112,6 +119,9 @@ func (NilTimer) Snapshot() Timer { return NilTimer{} }
// StdDev is a no-op.
func (NilTimer) StdDev() float64 { return 0.0 }

// Stop is a no-op.
func (NilTimer) Stop() {}

// Sum is a no-op.
func (NilTimer) Sum() int64 { return 0 }

Expand Down Expand Up @@ -201,6 +211,11 @@ func (t *StandardTimer) StdDev() float64 {
return t.histogram.StdDev()
}

// Stop stops the meter.
func (t *StandardTimer) Stop() {
t.meter.Stop()
}

// Sum returns the sum in the sample.
func (t *StandardTimer) Sum() int64 {
return t.histogram.Sum()
Expand Down Expand Up @@ -288,6 +303,9 @@ func (t *TimerSnapshot) Snapshot() Timer { return t }
// was taken.
func (t *TimerSnapshot) StdDev() float64 { return t.histogram.StdDev() }

// Stop is a no-op.
func (t *TimerSnapshot) Stop() {}

// Sum returns the sum at the time the snapshot was taken.
func (t *TimerSnapshot) Sum() int64 { return t.histogram.Sum() }

Expand Down
12 changes: 12 additions & 0 deletions timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ func TestTimerExtremes(t *testing.T) {
}
}

func TestTimerStop(t *testing.T) {
l := len(arbiter.meters)
tm := NewTimer()
if len(arbiter.meters) != l+1 {
t.Errorf("arbiter.meters: %d != %d\n", l+1, len(arbiter.meters))
}
tm.Stop()
if len(arbiter.meters) != l {
t.Errorf("arbiter.meters: %d != %d\n", l, len(arbiter.meters))
}
}

func TestTimerFunc(t *testing.T) {
tm := NewTimer()
tm.Time(func() { time.Sleep(50e6) })
Expand Down