Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unregister metrics when closing broker #991

Merged
merged 1 commit into from
Dec 5, 2017

Conversation

eapache
Copy link
Contributor

@eapache eapache commented Nov 29, 2017

@slaunay @mihasya @lday0321 this should hopefully solve #897? I don't know if I've covered all the cases.

Wanted to do this for a while, but upstream didn't have an unregister
method.
@slaunay
Copy link
Contributor

slaunay commented Nov 30, 2017

This will reduce most of the memory leak as it is mostly tied to metrics.Meter created by non seed Broker.
That being said the following meters would still leak when closing a Client (or an AsyncProducer/Consumer) and then creating a new one (not sure if this is a common use case):

  • incoming-byte-rate
  • outgoing-byte-rate
  • request-rate
  • record-send-rate
  • record-send-rate-for-topic-<topic>

Also if a custom metrics.Registry is provided in the Config, those metrics will still exists in the registry after the Client has been closed. This can be used to contain such memory leak by reusing that custom metrics.Registry but it's abusing a side effect.
If we want to add more metrics in the future (e.g. per topic and partition) this can become problematic.

I can provide a similar PR for unregistering broker metrics when a Broker is closed (as done here) plus unregistering global ones when a Client is closed.
Ensuring backward compatibility will make such a fix more convoluted that the current one.

Here is a benchmark that can be used with pprof to check if memory is still leaking:

func BenchmarkNewClientWithMeterMetric(b *testing.B) {
    setupFunctionalTest(b)
    defer teardownFunctionalTest(b)

    for i := 0; i < b.N; i++ {
        client, err := NewClient(kafkaBrokers, nil)
        if err != nil {
            b.Fatal(err)
        }
        safeClose(b, client)
    }
}

And the pprof bits to see what was not garbage collected at the end of the benchmark (-memprofile triggers a GC before profiling the heap):

$ go test -bench=NewClient -benchmem -memprofilerate=1 -memprofile mem.prof -run=^$ -v -benchtime=10s
BenchmarkNewClientWithMeterMetric-8         5000           2679507 ns/op           69010 B/op        185 allocs/op
PASS
ok      github.com/Shopify/sarama       13.682s
$ go tool pprof -top sarama.test mem.prof | grep metric
         0     0% 98.99%  6400.19kB 96.87%  github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics.(*StandardRegistry).GetOrRegister
         0     0% 98.99%     6400kB 96.86%  github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics.GetOrRegisterMeter
$ go tool pprof -list metric sarama.test mem.prof
Total: 6.45MB
ROUTINE ======================== github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics.(*StandardRegistry).GetOrRegister in /.../github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics/registry.go
         0     6.25MB (flat, cum) 96.87% of Total
         .          .     82:   defer r.mutex.Unlock()
         .          .     83:   if metric, ok := r.metrics[name]; ok {
         .          .     84:           return metric
         .          .     85:   }
         .          .     86:   if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
         .     6.25MB     87:           i = v.Call(nil)[0].Interface()
         .          .     88:   }
         .          .     89:   r.register(name, i)
         .          .     90:   return i
         .          .     91:}
         .          .     92:
         .          .     93:// Register the given metric under the given name.  Returns a DuplicateMetric
         .          .     94:// if a metric by the given name is already registered.
ROUTINE ======================== github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics.(*StandardRegistry).register in /.../github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics/registry.go
         0          0 (flat, cum)     0% of Total
         .          .    131:   if _, ok := r.metrics[name]; ok {
         .          .    132:           return DuplicateMetric(name)
         .          .    133:   }
         .          .    134:   switch i.(type) {
         .          .    135:   case Counter, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer:
         .          .    136:           r.metrics[name] = i
         .          .    137:   }
         .          .    138:   return nil
         .          .    139:}
         .          .    140:
         .          .    141:func (r *StandardRegistry) registered() map[string]interface{} {
ROUTINE ======================== github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics.GetOrRegisterMeter in /.../github.com/Shopify/sarama/vendor/github.com/rcrowley/go-metrics/meter.go
         0     6.25MB (flat, cum) 96.86% of Total
         .          .     24:// allow for garbage collection.
         .          .     25:func GetOrRegisterMeter(name string, r Registry) Meter {
         .          .     26:   if nil == r {
         .          .     27:           r = DefaultRegistry
         .          .     28:   }
         .     6.25MB     29:   return r.GetOrRegister(name, NewMeter).(Meter)
         .          .     30:}
         .          .     31:
         .          .     32:// NewMeter constructs a new StandardMeter and launches a goroutine.
         .          .     33:// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
         .          .     34:func NewMeter() Meter {

@eapache
Copy link
Contributor Author

eapache commented Dec 4, 2017

Ensuring backward compatibility will make such a fix more convoluted that the current one.

Why? Is this PR not backwards-compatible in some way I'm missing?

Does it make sense to merge this PR and open another one to unregister metrics on Client.Close or would you prefer to tackle it all at once?

@slaunay
Copy link
Contributor

slaunay commented Dec 4, 2017

Sorry for the confusion.

What I meant is that unregistering all the metrics (including the meters that leak memory) in a backward compatible way (e.g. not introducing breaking changes or requiring calling a new method for cleaning up) is not trivial as they are created in various places.
Of course backward compatibility is mandatory like it is the case in the current PR.

I can submit such a PR for unregistering all the metrics in the next few days but because they will touch the same lines it probably make sense to keep this one till you decide which solution is more sensible.

@eapache
Copy link
Contributor Author

eapache commented Dec 4, 2017

So I dug through the metrics a bit more and IIUC they fall in three categories:

  • per-broker metrics (which this PR fixes)
  • global broker metrics (which can't be automatically unregistered since they're not scoped to anything at all)
  • global produce-request metrics (which can't be automatically unregistered since they're not scoped to anything at all)

Have I missed a set or do you have a clever idea for the latter two?

@slaunay
Copy link
Contributor

slaunay commented Dec 4, 2017

The common scope of all the metrics is the Config struct as it references a local metrics.Registry but such struct does not provide a method for closing or releasing resources and it probably does not make sense to add one in there.
Metrics are never global per say as you can have two AsyncProducer with their own Config and the metrics would not collide (referenced by their own metrics.Registry).

The Client interface is a common component used by high level components (AsyncProducer, Consumer) and is often stored with a flag (ownClient) in order to decide when to clean up resources.
In order to unregister metrics, we need to keep track of them potentially with a scope (per broker or "global") and choose when it is safe to proceed with the unregistration.

One solution would be to expose a struct implementing metrics.Registry as a wrapper against the one configured in the Config struct for registering metrics and differentiate the broker metrics from the "global" ones through the name (-for-broker-).
Another solution would be to have a struct referencing the metrics.Registry from the Config struct with explicit methods to register a broker or a "global" metric.
I do prefer the second one as it is explicit.

Then whenever we close a Client (directly or through a higher level component like AsyncProducer) we would also cleanup the metrics.
We could also do that in the AsyncProducer or Consumer too but that seems redundant.

So the idea would be to tie the life cycle of the metrics to the life cycle of the Client with some abstraction to register/reference metrics so that we can clean them up.
This is more convoluted that the current PR but as long as the user closes the Client or the AsyncProducer (which he must already) we should be able to unregister any metrics that was added to the configured metrics.Registry (keep in mind that it could be a custom one) and therefore fix the memory leak.

Also, one other use case that comes to mind would be to have producer metrics like record-retry-rate (meter 😢) that would be fed by the AsyncProducer.
I believe the internal Client is closed once the AsyncProducer is shutdown so we should be able to support such metrics too.
It is actually already the case in a way for metrics created when building a ProduceRequest.

The PR implementing such behavior should clear things up I hope and provide good material for discussion.

@eapache
Copy link
Contributor Author

eapache commented Dec 5, 2017

The common scope of all the metrics is the Config struct as it references a local metrics.Registry

Right

but such struct does not provide a method for closing or releasing resources

The registry itself provides a UnregisterAll method... I'm tempted to just tell people they should call that when they're done with a registry. Anything else is gonna cause a bunch of complications when people re-use configs.

Ugh, this is messy.

Thanks for the detailed explanation - I'm gonna merge this PR and leave the rest of it well enough alone. This fixes the majority of the realistic issues and as you mentioned everything else gets super-complicated because we don't have anything which precisely matches the right scopes. Not worth spending time on right now as long as it's just hypothetical.

@eapache eapache merged commit cd645bf into master Dec 5, 2017
@eapache eapache deleted the cleanup-broker-metric-leaks branch December 5, 2017 15:08
@slaunay
Copy link
Contributor

slaunay commented Dec 5, 2017

👍 Fair enough, keeping it simple.
The broker meters were the ones mainly leaking memory although it never really impacted our applications.

The only drawback of using registry.UnregisterAll is that when using the PrefixedRegistry it does not behave as expected (at least from my perspective):

package main

import (
        "fmt"
        "os"

        "github.com/Shopify/sarama"
        "github.com/rcrowley/go-metrics"
)

func main() {
        appMetricRegistry := metrics.NewRegistry()
        appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
        appGauge.Update(1)

        config := sarama.NewConfig()
        // Use a prefix registry instead of the default local one
        config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")

        // Simulate a metric created by sarama without starting a broker
        saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
        saramaGauge.Update(2)

        fmt.Println("App metrics before:")
        metrics.WriteOnce(appMetricRegistry, os.Stdout)

        // Unregister all the sarama metrics
        // but will unregister the parent registry metrics too...
        config.MetricRegistry.UnregisterAll()

        fmt.Println("App metrics after:")
        metrics.WriteOnce(appMetricRegistry, os.Stdout)
        // Output:
        // App metrics before:
	// gauge m1
	//   value:               1
	// gauge sarama.m2
        //   value:               2
        // App metrics after:
}

But this is a bug in go-metrics and one can easily build a different implementation of PrefixedRegistry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants