Skip to content

Commit

Permalink
Reduce reloads (#362)
Browse files Browse the repository at this point in the history
Reduce the number of reloads for endpoint changes by batching the ingresses to be updated, writing the config files and (in the case of Plus) updating UpstreamServers via the API, before finally reloading NGINX at the end.
  • Loading branch information
isaachawley authored Sep 10, 2018

Verified

This commit was signed with the committer’s verified signature.
pdabelf5 Paul Abel
1 parent 47904ec commit 7e54578
Showing 4 changed files with 90 additions and 53 deletions.
48 changes: 29 additions & 19 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
@@ -277,44 +277,55 @@ func (lbc *LoadBalancerController) syncEndpoint(task queue.Task) {
if endpExists {
ings := lbc.getIngressForEndpoints(obj)

for _, ing := range ings {
if !lbc.IsNginxIngress(&ing) {
var ingExes []*nginx.IngressEx
var mergableIngressesSlice []*nginx.MergeableIngresses

for i := range ings {
if !lbc.IsNginxIngress(&ings[i]) {
continue
}
if utils.IsMinion(&ing) {
master, err := lbc.FindMasterForMinion(&ing)
if utils.IsMinion(&ings[i]) {
master, err := lbc.FindMasterForMinion(&ings[i])
if err != nil {
glog.Errorf("Ignoring Ingress %v(Minion): %v", ing.Name, err)
glog.Errorf("Ignoring Ingress %v(Minion): %v", ings[i].Name, err)
continue
}
if !lbc.configurator.HasIngress(master) {
if !lbc.configurator.HasMinion(master, &ings[i]) {
continue
}
mergeableIngresses, err := lbc.createMergableIngresses(master)
if err != nil {
glog.Errorf("Ignoring Ingress %v(Minion): %v", ing.Name, err)
glog.Errorf("Ignoring Ingress %v(Minion): %v", ings[i].Name, err)
continue
}

glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Namespace, ing.Name)
err = lbc.configurator.UpdateEndpointsMergeableIngress(mergeableIngresses)
if err != nil {
glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err)
}
mergableIngressesSlice = append(mergableIngressesSlice, mergeableIngresses)
continue
}
if !lbc.configurator.HasIngress(&ing) {
if !lbc.configurator.HasIngress(&ings[i]) {
continue
}
ingEx, err := lbc.createIngress(&ing)
ingEx, err := lbc.createIngress(&ings[i])
if err != nil {
glog.Errorf("Error updating endpoints for %v/%v: %v, skipping", ing.Namespace, ing.Name, err)
glog.Errorf("Error updating endpoints for %v/%v: %v, skipping", &ings[i].Namespace, &ings[i].Name, err)
continue
}
glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Namespace, ing.Name)
err = lbc.configurator.UpdateEndpoints(ingEx)
ingExes = append(ingExes, ingEx)
}

if len(ingExes) > 0 {
glog.V(3).Infof("Updating Endpoints for %v", ingExes)
err = lbc.configurator.UpdateEndpoints(ingExes)
if err != nil {
glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err)
glog.Errorf("Error updating endpoints for %v: %v", ingExes, err)
}
}

if len(mergableIngressesSlice) > 0 {
glog.V(3).Infof("Updating Endpoints for %v", mergableIngressesSlice)
err = lbc.configurator.UpdateEndpointsMergeableIngress(mergableIngressesSlice)
if err != nil {
glog.Errorf("Error updating endpoints for %v: %v", mergableIngressesSlice, err)
}
}
}
@@ -1130,7 +1141,6 @@ func (lbc *LoadBalancerController) IsNginxIngress(ing *extensions.Ingress) bool
return class == lbc.ingressClass || class == ""
}
return !lbc.useIngressClassOnly

}

// isHealthCheckEnabled checks if health checks are enabled so we can only query pods if enabled.
69 changes: 41 additions & 28 deletions internal/nginx/configurator.go
Original file line number Diff line number Diff line change
@@ -962,51 +962,64 @@ func (cnf *Configurator) DeleteIngress(key string) error {
return nil
}

// UpdateEndpoints updates endpoints in NGINX configuration for the Ingress resource
func (cnf *Configurator) UpdateEndpoints(ingEx *IngressEx) error {
err := cnf.addOrUpdateIngress(ingEx)
if err != nil {
return fmt.Errorf("Error adding or updating ingress %v/%v: %v", ingEx.Ingress.Namespace, ingEx.Ingress.Name, err)
}
// UpdateEndpoints updates endpoints in NGINX configuration for the Ingress resources
func (cnf *Configurator) UpdateEndpoints(ingExes []*IngressEx) error {
reloadPlus := false

if cnf.isPlus() {
err = cnf.updatePlusEndpoints(ingEx)
if err == nil {
return nil
for _, ingEx := range ingExes {
err := cnf.addOrUpdateIngress(ingEx)
if err != nil {
return fmt.Errorf("Error adding or updating ingress %v/%v: %v", ingEx.Ingress.Namespace, ingEx.Ingress.Name, err)
}
glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err)

if cnf.isPlus() {
err := cnf.updatePlusEndpoints(ingEx)
if err != nil {
glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err)
reloadPlus = true
}
}
}

if cnf.isPlus() && !reloadPlus {
glog.V(3).Info("No need to reload nginx")
return nil
}

if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error reloading NGINX when updating endpoints for %v/%v: %v", ingEx.Ingress.Namespace, ingEx.Ingress.Name, err)
return fmt.Errorf("Error reloading NGINX when updating endpoints: %v", err)
}

return nil
}

// UpdateEndpointsMergeableIngress updates endpoints in NGINX configuration for a mergeable Ingress resource
func (cnf *Configurator) UpdateEndpointsMergeableIngress(mergeableIngs *MergeableIngresses) error {
err := cnf.addOrUpdateMergeableIngress(mergeableIngs)
if err != nil {
return fmt.Errorf("Error adding or updating ingress %v/%v: %v", mergeableIngs.Master.Ingress.Namespace, mergeableIngs.Master.Ingress.Name, err)
}
func (cnf *Configurator) UpdateEndpointsMergeableIngress(mergableIngressesSlice []*MergeableIngresses) error {
reloadPlus := false
for i := range mergableIngressesSlice {
err := cnf.addOrUpdateMergeableIngress(mergableIngressesSlice[i])
if err != nil {
return fmt.Errorf("Error adding or updating mergeableIngress %v/%v: %v", mergableIngressesSlice[i].Master.Ingress.Namespace, mergableIngressesSlice[i].Master.Ingress.Name, err)
}

if cnf.isPlus() {
for _, ing := range mergeableIngs.Minions {
err = cnf.updatePlusEndpoints(ing)
if err != nil {
glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err)
break
if cnf.isPlus() {
for _, ing := range mergableIngressesSlice[i].Minions {
err = cnf.updatePlusEndpoints(ing)
if err != nil {
glog.Warningf("Couldn't update the endpoints via the API: %v; reloading configuration instead", err)
reloadPlus = true
}
}
}
if err == nil {
return nil
}
}
if cnf.isPlus() && !reloadPlus {
glog.V(3).Info("No need to reload nginx")
return nil
}

if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error reloading NGINX when updating endpoints for %v/%v: %v", mergeableIngs.Master.Ingress.Namespace, mergeableIngs.Master.Ingress.Name, err)
return fmt.Errorf("Error reloading NGINX when updating endpoints for %v: %v", mergableIngressesSlice, err)
}

return nil
}

16 changes: 10 additions & 6 deletions internal/nginx/configurator_test.go
Original file line number Diff line number Diff line change
@@ -718,14 +718,15 @@ func TestUpdateEndpoints(t *testing.T) {
}

ingress := createCafeIngressEx()
err = cnf.UpdateEndpoints(&ingress)
ingresses := []*IngressEx{&ingress}
err = cnf.UpdateEndpoints(ingresses)
if err != nil {
t.Errorf("UpdateEndpoints returned\n%v, but expected \n%v", err, nil)
}

// test with OSS Configurator
cnf.nginxAPI = nil
err = cnf.UpdateEndpoints(&ingress)
err = cnf.UpdateEndpoints(ingresses)
if err != nil {
t.Errorf("UpdateEndpoints returned\n%v, but expected \n%v", err, nil)
}
@@ -738,14 +739,15 @@ func TestUpdateEndpointsMergeableIngress(t *testing.T) {
}

mergeableIngress := createMergeableCafeIngress()
err = cnf.UpdateEndpointsMergeableIngress(mergeableIngress)
mergeableIngresses := []*MergeableIngresses{mergeableIngress}
err = cnf.UpdateEndpointsMergeableIngress(mergeableIngresses)
if err != nil {
t.Errorf("UpdateEndpointsMergeableIngress returned \n%v, but expected \n%v", err, nil)
}

// test with OSS Configurator
cnf.nginxAPI = nil
err = cnf.UpdateEndpointsMergeableIngress(mergeableIngress)
err = cnf.UpdateEndpointsMergeableIngress(mergeableIngresses)
if err != nil {
t.Errorf("UpdateEndpointsMergeableIngress returned \n%v, but expected \n%v", err, nil)
}
@@ -758,7 +760,8 @@ func TestUpdateEndpointsFailsWithInvalidTemplate(t *testing.T) {
}

ingress := createCafeIngressEx()
err = cnf.UpdateEndpoints(&ingress)
ingresses := []*IngressEx{&ingress}
err = cnf.UpdateEndpoints(ingresses)
if err == nil {
t.Errorf("UpdateEndpoints returned\n%v, but expected \n%v", nil, "template execution error")
}
@@ -771,7 +774,8 @@ func TestUpdateEndpointsMergeableIngressFailsWithInvalidTemplate(t *testing.T) {
}

mergeableIngress := createMergeableCafeIngress()
err = cnf.UpdateEndpointsMergeableIngress(mergeableIngress)
mergeableIngresses := []*MergeableIngresses{mergeableIngress}
err = cnf.UpdateEndpointsMergeableIngress(mergeableIngresses)
if err == nil {
t.Errorf("UpdateEndpointsMergeableIngress returned \n%v, but expected \n%v", nil, "template execution error")
}
10 changes: 10 additions & 0 deletions internal/nginx/ingress.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package nginx

import (
"fmt"

api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
)
@@ -60,3 +62,11 @@ var minionInheritanceList = map[string]bool{
"nginx.org/max-fails": true,
"nginx.org/fail-timeout": true,
}

func (ingEx *IngressEx) String() string {
if ingEx.Ingress == nil {
return "IngressEx has no Ingress"
}

return fmt.Sprintf("%v/%v", ingEx.Ingress.Namespace, ingEx.Ingress.Name)
}

0 comments on commit 7e54578

Please sign in to comment.