From 13d007e07122be34a156a542005a49425236a0be Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Tue, 13 Mar 2018 17:46:37 +0000 Subject: [PATCH 1/3] Reduce the number of reloads for endpoints changes --- internal/controller/controller.go | 52 ++++++++++++++++++------------- internal/nginx/configurator.go | 34 +++++++++++++------- internal/nginx/ingress.go | 10 ++++++ 3 files changed, 63 insertions(+), 33 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 8882679380..fdc5a1559e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -277,14 +277,17 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { if endpExists { ings := lbc.getIngressForEndpoints(obj) - for _, ing := range ings { - if !lbc.IsNginxIngress(&ing) { + var ingExes []*nginx.IngressEx + var mergableIngresses []*nginx.MergeableIngresses + + for i := range ings { + if !lbc.isNginxIngress(&ings[i]) { continue } - if utils.IsMinion(&ing) { - master, err := lbc.FindMasterForMinion(&ing) + if isMinion(&ings[i]) { + master, err := lbc.findMasterForMinion(&ings[i]) if err != nil { - glog.Errorf("Ignoring Ingress %v(Minion): %v", ing.Name, err) + glog.Errorf("Ignoring Ingress %v(Minion): %v", ings[i].Name, err) continue } if !lbc.configurator.HasIngress(master) { @@ -292,30 +295,37 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { } mergeableIngresses, err := lbc.createMergableIngresses(master) if err != nil { - glog.Errorf("Ignoring Ingress %v(Minion): %v", ing.Name, err) + glog.Errorf("Ignoring Ingress %v(Minion): %v", ings[i].Name, err) continue } - glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Namespace, ing.Name) - err = lbc.configurator.UpdateEndpointsMergeableIngress(mergeableIngresses) - if err != nil { - glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err) - } + mergableIngExes = append(mergableIngExes, mergableIngresses) continue } - if !lbc.configurator.HasIngress(&ing) { + if !lbc.cnf.HasIngress(&ings[i]) { continue } - ingEx, err := lbc.createIngress(&ing) + ingEx, err := lbc.createIngress(&ings[i]) if err != nil { - glog.Errorf("Error updating endpoints for %v/%v: %v, skipping", ing.Namespace, ing.Name, err) + glog.Errorf("Error updating endpoints for %v/%v: %v, skipping", &ings[i].Namespace, &ings[i].Name, err) continue } - glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Namespace, ing.Name) - err = lbc.configurator.UpdateEndpoints(ingEx) - if err != nil { - glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err) - } + ingExes = append(ingExes, ingEx) + } + + if len(ingExes) == 0 { + return + } + + glog.V(3).Infof("Updating Endpoints for %v", ingExes) + lbc.cnf.UpdateEndpoints(ingExes) + if err != nil { + glog.Errorf("Error updating endpoints for %v: %v", ingExes, err) + } + glog.V(3).Infof("Updating Endpoints for %v", mergableIngresses) + // TODO FIX err = lbc.cnf.UpdateEndpointsMergeableIngress(mergeableIngresses) + if err != nil { + glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err) } } } @@ -1128,9 +1138,9 @@ func (lbc *LoadBalancerController) IsNginxIngress(ing *extensions.Ingress) bool return class == lbc.ingressClass } return class == lbc.ingressClass || class == "" + } else { + return !lbc.useIngressClassOnly } - return !lbc.useIngressClassOnly - } // isHealthCheckEnabled checks if health checks are enabled so we can only query pods if enabled. diff --git a/internal/nginx/configurator.go b/internal/nginx/configurator.go index c53fe18002..79a11c9c9a 100644 --- a/internal/nginx/configurator.go +++ b/internal/nginx/configurator.go @@ -962,27 +962,37 @@ func (cnf *Configurator) DeleteIngress(key string) error { return nil } -// UpdateEndpoints updates endpoints in NGINX configuration for the Ingress resource -func (cnf *Configurator) UpdateEndpoints(ingEx *IngressEx) error { - err := cnf.addOrUpdateIngress(ingEx) - if err != nil { - return fmt.Errorf("Error adding or updating ingress %v/%v: %v", ingEx.Ingress.Namespace, ingEx.Ingress.Name, err) - } +// UpdateEndpoints updates endpoints in NGINX configuration for the Ingress resources +func (cnf *Configurator) UpdateEndpoints(ingExes []*IngressEx) error { + reloadPlus := false - if cnf.isPlus() { - err = cnf.updatePlusEndpoints(ingEx) - if err == nil { - return nil + for _, ingEx := range ingExes { + err := cnf.addOrUpdateIngress(ingEx) + if err != nil { + return fmt.Errorf("Error adding or updating ingress %v/%v: %v", ingEx.Ingress.Namespace, ingEx.Ingress.Name, err) + } + + if cnf.isPlus() { + err := cnf.updatePlusEndpoints(ingEx) + if err != nil { + glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err) + reloadPlus = true + } } - glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err) } + + if cnf.isPlus() && !reloadPlus { + return nil + } + if err := cnf.nginx.Reload(); err != nil { - return fmt.Errorf("Error reloading NGINX when updating endpoints for %v/%v: %v", ingEx.Ingress.Namespace, ingEx.Ingress.Name, err) + return fmt.Errorf("Error reloading NGINX when updating endpoints: %v", err) } return nil } +// TODO FIX // UpdateEndpointsMergeableIngress updates endpoints in NGINX configuration for a mergeable Ingress resource func (cnf *Configurator) UpdateEndpointsMergeableIngress(mergeableIngs *MergeableIngresses) error { err := cnf.addOrUpdateMergeableIngress(mergeableIngs) diff --git a/internal/nginx/ingress.go b/internal/nginx/ingress.go index 12c4bb2c8e..ea188b3759 100644 --- a/internal/nginx/ingress.go +++ b/internal/nginx/ingress.go @@ -1,6 +1,8 @@ package nginx import ( + "fmt" + api_v1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" ) @@ -60,3 +62,11 @@ var minionInheritanceList = map[string]bool{ "nginx.org/max-fails": true, "nginx.org/fail-timeout": true, } + +func (ingEx *IngressEx) String() string { + if ingEx.Ingress == nil { + return "IngressEx has no Ingress" + } + + return fmt.Sprintf("%v/%v", ingEx.Ingress.Namespace, ingEx.Ingress.Name) +} From fc82d2509cf8087f9bce1893e3f420a9c44d2407 Mon Sep 17 00:00:00 2001 From: isaac Date: Tue, 4 Sep 2018 09:25:29 +0100 Subject: [PATCH 2/3] Avoid enqueueing ingress for service change - batch handling for mergable ingresses - error collection and logging --- internal/controller/controller.go | 10 +++--- internal/nginx/configurator.go | 52 +++++++++++++++++++---------- internal/nginx/configurator_test.go | 16 +++++---- 3 files changed, 50 insertions(+), 28 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index fdc5a1559e..6fd9c5934e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -278,7 +278,7 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { ings := lbc.getIngressForEndpoints(obj) var ingExes []*nginx.IngressEx - var mergableIngresses []*nginx.MergeableIngresses + var mergableIngressesBatch []*nginx.MergeableIngresses for i := range ings { if !lbc.isNginxIngress(&ings[i]) { @@ -299,7 +299,7 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { continue } - mergableIngExes = append(mergableIngExes, mergableIngresses) + mergableIngressesBatch = append(mergableIngressesBatch, mergeableIngresses) continue } if !lbc.cnf.HasIngress(&ings[i]) { @@ -322,10 +322,10 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { if err != nil { glog.Errorf("Error updating endpoints for %v: %v", ingExes, err) } - glog.V(3).Infof("Updating Endpoints for %v", mergableIngresses) - // TODO FIX err = lbc.cnf.UpdateEndpointsMergeableIngress(mergeableIngresses) + glog.V(3).Infof("Updating Endpoints for %v", mergableIngressesBatch) + err = lbc.cnf.UpdateEndpointsMergeableIngress(mergableIngressesBatch) if err != nil { - glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err) + glog.Errorf("Error updating endpoints for %v: %v", mergableIngressesBatch, err) } } } diff --git a/internal/nginx/configurator.go b/internal/nginx/configurator.go index 79a11c9c9a..b01689d2a2 100644 --- a/internal/nginx/configurator.go +++ b/internal/nginx/configurator.go @@ -982,6 +982,7 @@ func (cnf *Configurator) UpdateEndpoints(ingExes []*IngressEx) error { } if cnf.isPlus() && !reloadPlus { + glog.V(3).Info("No need to reload nginx") return nil } @@ -992,32 +993,49 @@ func (cnf *Configurator) UpdateEndpoints(ingExes []*IngressEx) error { return nil } -// TODO FIX // UpdateEndpointsMergeableIngress updates endpoints in NGINX configuration for a mergeable Ingress resource -func (cnf *Configurator) UpdateEndpointsMergeableIngress(mergeableIngs *MergeableIngresses) error { - err := cnf.addOrUpdateMergeableIngress(mergeableIngs) - if err != nil { - return fmt.Errorf("Error adding or updating ingress %v/%v: %v", mergeableIngs.Master.Ingress.Namespace, mergeableIngs.Master.Ingress.Name, err) - } +func (cnf *Configurator) UpdateEndpointsMergeableIngress(mergableIngressesBatch []*MergeableIngresses) error { + reloadPlus := false + errors := []error{} + for i := range mergableIngressesBatch { + err := cnf.addOrUpdateMergeableIngress(mergableIngressesBatch[i]) + if err != nil { + glog.V(3).Infof("Error adding or updating ingress %v/%v: %v", mergableIngressesBatch[i].Master.Ingress.Namespace, mergableIngressesBatch[i].Master.Ingress.Name, err) + errors = append(errors, err) + continue + } - if cnf.isPlus() { - for _, ing := range mergeableIngs.Minions { - err = cnf.updatePlusEndpoints(ing) - if err != nil { - glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err) - break + if cnf.isPlus() { + for _, ing := range mergableIngressesBatch[i].Minions { + err = cnf.updatePlusEndpoints(ing) + if err != nil { + glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err) + errors = append(errors, err) + reloadPlus = true + break + } } } - if err == nil { - return nil - } + } + if cnf.isPlus() && !reloadPlus { + return consolodiateErrors(errors) } if err := cnf.nginx.Reload(); err != nil { - return fmt.Errorf("Error reloading NGINX when updating endpoints for %v/%v: %v", mergeableIngs.Master.Ingress.Namespace, mergeableIngs.Master.Ingress.Name, err) + return fmt.Errorf("Error reloading NGINX: %v. additional errors: %v", err, consolodiateErrors(errors)) } + return consolodiateErrors(errors) +} - return nil +func consolodiateErrors(errors []error) error { + errorString := "" + if len(errors) == 0 { + return nil + } + for _, e := range errors { + errorString = errorString + e.Error() + } + return fmt.Errorf(errorString) } func (cnf *Configurator) updatePlusEndpoints(ingEx *IngressEx) error { diff --git a/internal/nginx/configurator_test.go b/internal/nginx/configurator_test.go index f0ae5a66cb..28aa0eadf2 100644 --- a/internal/nginx/configurator_test.go +++ b/internal/nginx/configurator_test.go @@ -718,14 +718,15 @@ func TestUpdateEndpoints(t *testing.T) { } ingress := createCafeIngressEx() - err = cnf.UpdateEndpoints(&ingress) + ingresses := []*IngressEx{&ingress} + err = cnf.UpdateEndpoints(ingresses) if err != nil { t.Errorf("UpdateEndpoints returned\n%v, but expected \n%v", err, nil) } // test with OSS Configurator cnf.nginxAPI = nil - err = cnf.UpdateEndpoints(&ingress) + err = cnf.UpdateEndpoints(ingresses) if err != nil { t.Errorf("UpdateEndpoints returned\n%v, but expected \n%v", err, nil) } @@ -738,14 +739,15 @@ func TestUpdateEndpointsMergeableIngress(t *testing.T) { } mergeableIngress := createMergeableCafeIngress() - err = cnf.UpdateEndpointsMergeableIngress(mergeableIngress) + mergeableIngresses := []*MergeableIngresses{mergeableIngress} + err = cnf.UpdateEndpointsMergeableIngress(mergeableIngresses) if err != nil { t.Errorf("UpdateEndpointsMergeableIngress returned \n%v, but expected \n%v", err, nil) } // test with OSS Configurator cnf.nginxAPI = nil - err = cnf.UpdateEndpointsMergeableIngress(mergeableIngress) + err = cnf.UpdateEndpointsMergeableIngress(mergeableIngresses) if err != nil { t.Errorf("UpdateEndpointsMergeableIngress returned \n%v, but expected \n%v", err, nil) } @@ -758,7 +760,8 @@ func TestUpdateEndpointsFailsWithInvalidTemplate(t *testing.T) { } ingress := createCafeIngressEx() - err = cnf.UpdateEndpoints(&ingress) + ingresses := []*IngressEx{&ingress} + err = cnf.UpdateEndpoints(ingresses) if err == nil { t.Errorf("UpdateEndpoints returned\n%v, but expected \n%v", nil, "template execution error") } @@ -771,7 +774,8 @@ func TestUpdateEndpointsMergeableIngressFailsWithInvalidTemplate(t *testing.T) { } mergeableIngress := createMergeableCafeIngress() - err = cnf.UpdateEndpointsMergeableIngress(mergeableIngress) + mergeableIngresses := []*MergeableIngresses{mergeableIngress} + err = cnf.UpdateEndpointsMergeableIngress(mergeableIngresses) if err == nil { t.Errorf("UpdateEndpointsMergeableIngress returned \n%v, but expected \n%v", nil, "template execution error") } From 74645a24bd8f8c7b3407254ae62865bf9bc5de5a Mon Sep 17 00:00:00 2001 From: isaac Date: Wed, 5 Sep 2018 12:04:24 +0100 Subject: [PATCH 3/3] Simplify error handling - Most of the error handling in these batch update funcs can be removed. The caller only logs the errors anyway. - Fix a few rebase issues. --- internal/controller/controller.go | 40 +++++++++++++++---------------- internal/nginx/configurator.go | 33 +++++++------------------ 2 files changed, 29 insertions(+), 44 deletions(-) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 6fd9c5934e..706afc7859 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -278,19 +278,19 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { ings := lbc.getIngressForEndpoints(obj) var ingExes []*nginx.IngressEx - var mergableIngressesBatch []*nginx.MergeableIngresses + var mergableIngressesSlice []*nginx.MergeableIngresses for i := range ings { - if !lbc.isNginxIngress(&ings[i]) { + if !lbc.IsNginxIngress(&ings[i]) { continue } - if isMinion(&ings[i]) { - master, err := lbc.findMasterForMinion(&ings[i]) + if utils.IsMinion(&ings[i]) { + master, err := lbc.FindMasterForMinion(&ings[i]) if err != nil { glog.Errorf("Ignoring Ingress %v(Minion): %v", ings[i].Name, err) continue } - if !lbc.configurator.HasIngress(master) { + if !lbc.configurator.HasMinion(master, &ings[i]) { continue } mergeableIngresses, err := lbc.createMergableIngresses(master) @@ -299,10 +299,10 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { continue } - mergableIngressesBatch = append(mergableIngressesBatch, mergeableIngresses) + mergableIngressesSlice = append(mergableIngressesSlice, mergeableIngresses) continue } - if !lbc.cnf.HasIngress(&ings[i]) { + if !lbc.configurator.HasIngress(&ings[i]) { continue } ingEx, err := lbc.createIngress(&ings[i]) @@ -313,19 +313,20 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) { ingExes = append(ingExes, ingEx) } - if len(ingExes) == 0 { - return + if len(ingExes) > 0 { + glog.V(3).Infof("Updating Endpoints for %v", ingExes) + err = lbc.configurator.UpdateEndpoints(ingExes) + if err != nil { + glog.Errorf("Error updating endpoints for %v: %v", ingExes, err) + } } - glog.V(3).Infof("Updating Endpoints for %v", ingExes) - lbc.cnf.UpdateEndpoints(ingExes) - if err != nil { - glog.Errorf("Error updating endpoints for %v: %v", ingExes, err) - } - glog.V(3).Infof("Updating Endpoints for %v", mergableIngressesBatch) - err = lbc.cnf.UpdateEndpointsMergeableIngress(mergableIngressesBatch) - if err != nil { - glog.Errorf("Error updating endpoints for %v: %v", mergableIngressesBatch, err) + if len(mergableIngressesSlice) > 0 { + glog.V(3).Infof("Updating Endpoints for %v", mergableIngressesSlice) + err = lbc.configurator.UpdateEndpointsMergeableIngress(mergableIngressesSlice) + if err != nil { + glog.Errorf("Error updating endpoints for %v: %v", mergableIngressesSlice, err) + } } } } @@ -1138,9 +1139,8 @@ func (lbc *LoadBalancerController) IsNginxIngress(ing *extensions.Ingress) bool return class == lbc.ingressClass } return class == lbc.ingressClass || class == "" - } else { - return !lbc.useIngressClassOnly } + return !lbc.useIngressClassOnly } // isHealthCheckEnabled checks if health checks are enabled so we can only query pods if enabled. diff --git a/internal/nginx/configurator.go b/internal/nginx/configurator.go index b01689d2a2..8e2ab6f472 100644 --- a/internal/nginx/configurator.go +++ b/internal/nginx/configurator.go @@ -994,48 +994,33 @@ func (cnf *Configurator) UpdateEndpoints(ingExes []*IngressEx) error { } // UpdateEndpointsMergeableIngress updates endpoints in NGINX configuration for a mergeable Ingress resource -func (cnf *Configurator) UpdateEndpointsMergeableIngress(mergableIngressesBatch []*MergeableIngresses) error { +func (cnf *Configurator) UpdateEndpointsMergeableIngress(mergableIngressesSlice []*MergeableIngresses) error { reloadPlus := false - errors := []error{} - for i := range mergableIngressesBatch { - err := cnf.addOrUpdateMergeableIngress(mergableIngressesBatch[i]) + for i := range mergableIngressesSlice { + err := cnf.addOrUpdateMergeableIngress(mergableIngressesSlice[i]) if err != nil { - glog.V(3).Infof("Error adding or updating ingress %v/%v: %v", mergableIngressesBatch[i].Master.Ingress.Namespace, mergableIngressesBatch[i].Master.Ingress.Name, err) - errors = append(errors, err) - continue + return fmt.Errorf("Error adding or updating mergeableIngress %v/%v: %v", mergableIngressesSlice[i].Master.Ingress.Namespace, mergableIngressesSlice[i].Master.Ingress.Name, err) } if cnf.isPlus() { - for _, ing := range mergableIngressesBatch[i].Minions { + for _, ing := range mergableIngressesSlice[i].Minions { err = cnf.updatePlusEndpoints(ing) if err != nil { glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err) - errors = append(errors, err) reloadPlus = true - break } } } } if cnf.isPlus() && !reloadPlus { - return consolodiateErrors(errors) + glog.V(3).Info("No need to reload nginx") + return nil } if err := cnf.nginx.Reload(); err != nil { - return fmt.Errorf("Error reloading NGINX: %v. additional errors: %v", err, consolodiateErrors(errors)) - } - return consolodiateErrors(errors) -} - -func consolodiateErrors(errors []error) error { - errorString := "" - if len(errors) == 0 { - return nil + return fmt.Errorf("Error reloading NGINX when updating endpoints for %v: %v", mergableIngressesSlice, err) } - for _, e := range errors { - errorString = errorString + e.Error() - } - return fmt.Errorf(errorString) + return nil } func (cnf *Configurator) updatePlusEndpoints(ingEx *IngressEx) error {