Skip to content

Commit

Permalink
Cleanup prometheus metrics after a reload
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf committed Jun 27, 2018
1 parent fd0ee62 commit af4cfd2
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 16 deletions.
20 changes: 9 additions & 11 deletions cmd/nginx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,24 @@ func main() {

conf.Client = kubeClient

ngx := controller.NewNGINXController(conf, fs)

go handleSigterm(ngx, func(code int) {
os.Exit(code)
})

mux := http.NewServeMux()
go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux)

err = collector.InitNGINXStatusCollector(conf.Namespace, class.IngressClass, conf.ListenPorts.Status)

if err != nil {
glog.Fatalf("Error creating metric collector: %v", err)
}

err = collector.NewInstance(conf.Namespace, class.IngressClass)
mc, err := collector.NewInstance(conf.Namespace, class.IngressClass)
if err != nil {
glog.Fatalf("Error creating unix socket server: %v", err)
}

ngx := controller.NewNGINXController(conf, mc, fs)
go handleSigterm(ngx, func(code int) {
os.Exit(code)
})

mux := http.NewServeMux()
go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux)

ngx.Start()
}

Expand Down
32 changes: 32 additions & 0 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ func (n *NGINXController) syncIngress(interface{}) error {
}(isFirstSync)
}

re := getRemovedEndpoints(n.runningConfig, &pcfg)
if len(re) > 0 {
go n.metricCollector.RemoveMetrics("upstream_ip", re)
}

n.runningConfig = &pcfg

return nil
Expand Down Expand Up @@ -1086,3 +1091,30 @@ func extractTLSSecretName(host string, ing *extensions.Ingress,

return ""
}

// getRemovedEndpoints returns a list of the endpoints (IP address)
// that are not associated anymore to the NGINX configuration.
func getRemovedEndpoints(rucfg, newcfg *ingress.Configuration) []string {
oldEps := sets.NewString()
newEps := sets.NewString()

for _, b := range rucfg.Backends {
for _, ep := range b.Endpoints {
ea := fmt.Sprintf("%v:%v", ep.Address, ep.Port)
if !oldEps.Has(ea) {
oldEps.Insert(ea)
}
}
}

for _, b := range newcfg.Backends {
for _, ep := range b.Endpoints {
ea := fmt.Sprintf("%v:%v", ep.Address, ep.Port)
if !newEps.Has(ea) {
newEps.Insert(ea)
}
}
}

return oldEps.Difference(newEps).List()
}
7 changes: 6 additions & 1 deletion internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"k8s.io/ingress-nginx/internal/ingress/controller/process"
"k8s.io/ingress-nginx/internal/ingress/controller/store"
ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template"
"k8s.io/ingress-nginx/internal/ingress/metric/collector"
"k8s.io/ingress-nginx/internal/ingress/status"
ing_net "k8s.io/ingress-nginx/internal/net"
"k8s.io/ingress-nginx/internal/net/dns"
Expand All @@ -70,7 +71,7 @@ var (
)

// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
func NewNGINXController(config *Configuration, mc *collector.SocketCollector, fs file.Filesystem) *NGINXController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{
Expand Down Expand Up @@ -103,6 +104,8 @@ func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXControl
runningConfig: new(ingress.Configuration),

Proxy: &TCPProxy{},

metricCollector: mc,
}

n.store = store.New(
Expand Down Expand Up @@ -243,6 +246,8 @@ type NGINXController struct {
store store.Storer

fileSystem filesystem.Filesystem

metricCollector *collector.SocketCollector
}

// Start starts a new NGINX master process running in the foreground.
Expand Down
38 changes: 34 additions & 4 deletions internal/ingress/metric/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ type SocketCollector struct {
}

// NewInstance creates a new SocketCollector instance
func NewInstance(ns string, class string) error {
sc := SocketCollector{}
func NewInstance(ns string, class string) (*SocketCollector, error) {
sc := &SocketCollector{}

ns = strings.Replace(ns, "-", "_", -1)

listener, err := net.Listen("unix", "/tmp/prometheus-nginx.socket")
if err != nil {
return err
return nil, err
}

sc.listener = listener
Expand Down Expand Up @@ -156,7 +156,7 @@ func NewInstance(ns string, class string) error {

go sc.Run()

return nil
return sc, nil
}

func (sc *SocketCollector) handleMessage(msg []byte) {
Expand Down Expand Up @@ -273,6 +273,36 @@ func (sc *SocketCollector) Run() {
}
}

// RemoveMetrics cleans up any metrics that do not have a respective value
// associated with the label anymore.
func (sc *SocketCollector) RemoveMetrics(label string, values []string) {
for _, val := range values {
glog.Infof("Removing prometheus metric %v=%v", label, val)
l := prometheus.Labels{}
l[label] = val

removed := sc.upstreamResponseTime.Delete(l)
if !removed {
glog.Warningf("prometheus histogram upstream response time does not contain metrics for label %v=%v", label, val)
}

removed = sc.requestTime.Delete(l)
if !removed {
glog.Warningf("prometheus histogram request time does not contain metrics for label %v=%v", label, val)
}

removed = sc.requestLength.Delete(l)
if !removed {
glog.Warningf("prometheus histogram request length does not contain metrics for label %v=%v", label, val)
}

removed = sc.bytesSent.Delete(l)
if !removed {
glog.Warningf("prometheus histogram bytes sent does not contain metrics for label %v=%v", label, val)
}
}
}

const packetSize = 1024 * 65

// handleMessages process the content received in a network connection
Expand Down

0 comments on commit af4cfd2

Please sign in to comment.