Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix status update in case of connection errors #3267

Merged
merged 2 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions build/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,6 @@ fi

SCRIPT_ROOT=$(dirname ${BASH_SOURCE})/..

mkdir -p ${SCRIPT_ROOT}/test/binaries

TEST_BINARIES=$( cd "${SCRIPT_ROOT}/test/binaries" ; pwd -P )

export PATH=${TEST_BINARIES}:$PATH

if ! [ -x "$(command -v kubectl)" ]; then
echo "downloading kubectl..."
curl -sSLo ${TEST_BINARIES}/kubectl \
https://storage.googleapis.com/kubernetes-release/release/v1.11.0/bin/linux/amd64/kubectl
chmod +x ${TEST_BINARIES}/kubectl
fi

ginkgo build ./test/e2e

exec -- \
Expand Down
2 changes: 1 addition & 1 deletion build/go-in-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ if [ "$missing" = true ];then
exit 1
fi

E2E_IMAGE=quay.io/kubernetes-ingress-controller/e2e:v10292018-5d42f0e
E2E_IMAGE=quay.io/kubernetes-ingress-controller/e2e:v10292018-240c7274b

DOCKER_OPTS=${DOCKER_OPTS:-""}

Expand Down
6 changes: 5 additions & 1 deletion images/e2e/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM quay.io/kubernetes-ingress-controller/nginx-amd64:0.63
FROM quay.io/kubernetes-ingress-controller/nginx-amd64:0.66

RUN clean-install \
g++ \
Expand Down Expand Up @@ -61,3 +61,7 @@ RUN luarocks install luacheck \

RUN go get github.com/onsi/ginkgo/ginkgo \
&& go get golang.org/x/lint/golint

RUN curl -Lo /usr/local/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/v1.12.0/bin/linux/amd64/kubectl \
&& chmod +x /usr/local/bin/kubectl

3 changes: 1 addition & 2 deletions internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package controller

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -257,7 +256,7 @@ func (n *NGINXController) Start() {
n.store.Run(n.stopCh)

if n.syncStatus != nil {
go n.syncStatus.Run(context.Background())
go n.syncStatus.Run()
}

cmd := nginxExecCommand()
Expand Down
163 changes: 91 additions & 72 deletions internal/ingress/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (

// Sync ...
type Sync interface {
Run(ctx context.Context)
Run()
Shutdown()
}

Expand Down Expand Up @@ -93,22 +93,97 @@ type statusSync struct {
pod *k8s.PodInfo

elector *leaderelection.LeaderElector

// workqueue used to keep in sync the status IP/s
// in the Ingress rules
syncQueue *task.Queue
}

// Run starts the loop to keep the status in sync
func (s statusSync) Run(ctx context.Context) {
s.elector.Run(ctx)
func (s statusSync) Run() {
// we need to use the defined ingress class to allow multiple leaders
// in order to update information about ingress status
electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass)
if s.Config.IngressClass != "" {
electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass)
}

// start a new context
ctx := context.Background()
// allow to cancel the context in case we stop being the leader
leaderCtx, cancel := context.WithCancel(ctx)

var stopCh chan struct{}
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
glog.V(2).Infof("I am the new status update leader")
stopCh = make(chan struct{})
go s.syncQueue.Run(time.Second, stopCh)
// trigger initial sync
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
// when this instance is the leader we need to enqueue
// an item to trigger the update of the Ingress status.
wait.PollUntil(updateInterval, func() (bool, error) {
s.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
return false, nil
}, stopCh)
},
OnStoppedLeading: func() {
glog.V(2).Infof("I am not status update leader anymore")
close(stopCh)

// cancel the context
cancel()

// start a new context and run the elector
leaderCtx, cancel = context.WithCancel(ctx)
go s.elector.Run(leaderCtx)
},
OnNewLeader: func(identity string) {
glog.Infof("new leader elected: %v", identity)
},
}

broadcaster := record.NewBroadcaster()
hostname, _ := os.Hostname()

recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "ingress-leader-elector",
Host: hostname,
})

lock := resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID},
Client: s.Config.Client.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: s.pod.Name,
EventRecorder: recorder,
},
}

ttl := 30 * time.Second
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: &lock,
LeaseDuration: ttl,
RenewDeadline: ttl / 2,
RetryPeriod: ttl / 4,
Callbacks: callbacks,
})
if err != nil {
glog.Fatalf("unexpected error starting leader election: %v", err)
}
s.elector = le

go le.Run(leaderCtx)
}

// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
// if there is no other instances running.
func (s statusSync) Shutdown() {
go s.syncQueue.Shutdown()

// remove IP from Ingress
if !s.elector.IsLeader() {
if s.elector != nil && !s.elector.IsLeader() {
return
}

Expand Down Expand Up @@ -146,6 +221,10 @@ func (s *statusSync) sync(key interface{}) error {
return nil
}

if s.elector != nil && !s.elector.IsLeader() {
return fmt.Errorf("i am not the current leader. Skiping status update")
}

addrs, err := s.runningAddresses()
if err != nil {
return err
Expand Down Expand Up @@ -173,66 +252,6 @@ func NewStatusSyncer(config Config) Sync {
}
st.syncQueue = task.NewCustomTaskQueue(st.sync, st.keyfunc)

// we need to use the defined ingress class to allow multiple leaders
// in order to update information about ingress status
electionID := fmt.Sprintf("%v-%v", config.ElectionID, config.DefaultIngressClass)
if config.IngressClass != "" {
electionID = fmt.Sprintf("%v-%v", config.ElectionID, config.IngressClass)
}

var stopCh chan struct{}
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
glog.V(2).Infof("I am the new status update leader")
stopCh = make(chan struct{})
go st.syncQueue.Run(time.Second, stopCh)
// when this instance is the leader we need to enqueue
// an item to trigger the update of the Ingress status.
wait.PollUntil(updateInterval, func() (bool, error) {
st.syncQueue.EnqueueTask(task.GetDummyObject("sync status"))
return false, nil
}, stopCh)
},
OnStoppedLeading: func() {
glog.V(2).Infof("I am not status update leader anymore")
close(stopCh)
},
OnNewLeader: func(identity string) {
glog.Infof("new leader elected: %v", identity)
},
}

broadcaster := record.NewBroadcaster()
hostname, _ := os.Hostname()

recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
Component: "ingress-leader-elector",
Host: hostname,
})

lock := resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: electionID},
Client: config.Client.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: pod.Name,
EventRecorder: recorder,
},
}

ttl := 30 * time.Second
le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: &lock,
LeaseDuration: ttl,
RenewDeadline: ttl / 2,
RetryPeriod: ttl / 4,
Callbacks: callbacks,
})

if err != nil {
glog.Fatalf("unexpected error starting leader election: %v", err)
}

st.elector = le
return st
}

Expand Down Expand Up @@ -333,6 +352,13 @@ func (s *statusSync) updateStatus(newIngressPoint []apiv1.LoadBalancerIngress) {
sort.SliceStable(newIngressPoint, lessLoadBalancerIngress(newIngressPoint))

for _, ing := range ings {
curIPs := ing.Status.LoadBalancer.Ingress
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of code was moved here to avoid running unnecessary goroutines without an actual update to run

sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))
if ingressSliceEqual(curIPs, newIngressPoint) {
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
continue
}

batch.Queue(runUpdate(ing, newIngressPoint, s.Client))
}

Expand All @@ -347,14 +373,6 @@ func runUpdate(ing *extensions.Ingress, status []apiv1.LoadBalancerIngress,
return nil, nil
}

curIPs := ing.Status.LoadBalancer.Ingress
sort.SliceStable(curIPs, lessLoadBalancerIngress(curIPs))

if ingressSliceEqual(status, curIPs) {
glog.V(3).Infof("skipping update of Ingress %v/%v (no change)", ing.Namespace, ing.Name)
return true, nil
}

ingClient := client.ExtensionsV1beta1().Ingresses(ing.Namespace)

currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
Expand Down Expand Up @@ -398,5 +416,6 @@ func ingressSliceEqual(lhs, rhs []apiv1.LoadBalancerIngress) bool {
return false
}
}

return true
}
3 changes: 1 addition & 2 deletions internal/ingress/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package status

import (
"context"
"os"
"testing"
"time"
Expand Down Expand Up @@ -298,7 +297,7 @@ func TestStatusActions(t *testing.T) {
fk := fkSync.(statusSync)

// start it and wait for the election and syn actions
go fk.Run(context.Background())
go fk.Run()
// wait for the election
time.Sleep(100 * time.Millisecond)
// execute sync
Expand Down
5 changes: 2 additions & 3 deletions test/e2e/annotations/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ func execInfluxDBCommand(pod *corev1.Pod, command string) (string, error) {
execErr bytes.Buffer
)

args := fmt.Sprintf("kubectl exec --namespace %v %v -- %v", pod.Namespace, pod.Name, command)
cmd := exec.Command("/bin/bash", "-c", args)
cmd := exec.Command("/bin/bash", "-c", fmt.Sprintf("%v exec --namespace %s %s -- %s", framework.KubectlPath, pod.Namespace, pod.Name, command))
cmd.Stdout = &execOut
cmd.Stderr = &execErr

Expand All @@ -195,7 +194,7 @@ func execInfluxDBCommand(pod *corev1.Pod, command string) (string, error) {
}

if err != nil {
return "", fmt.Errorf("could not execute: %v", err)
return "", fmt.Errorf("could not execute '%s %s': %v", cmd.Path, cmd.Args, err)
}

return execOut.String(), nil
Expand Down
11 changes: 9 additions & 2 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ limitations under the License.
package e2e

import (
"os"
"testing"

"github.com/golang/glog"
"github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/config"
"github.com/onsi/gomega"
"k8s.io/apiserver/pkg/util/logs"

// required
_ "k8s.io/client-go/plugin/pkg/client/auth"

Expand All @@ -36,6 +37,7 @@ import (
_ "k8s.io/ingress-nginx/test/e2e/servicebackend"
_ "k8s.io/ingress-nginx/test/e2e/settings"
_ "k8s.io/ingress-nginx/test/e2e/ssl"
_ "k8s.io/ingress-nginx/test/e2e/status"
)

// RunE2ETests checks configuration parameters (specified through flags) and then runs
Expand All @@ -50,7 +52,12 @@ func RunE2ETests(t *testing.T) {
config.GinkgoConfig.SkipString = `\[Flaky\]|\[Feature:.+\]`
}

glog.Infof("Starting e2e run %q on Ginkgo node %d", framework.RunID, config.GinkgoConfig.ParallelNode)
if os.Getenv("KUBECTL_PATH") != "" {
framework.KubectlPath = os.Getenv("KUBECTL_PATH")
framework.Logf("Using kubectl path '%s'", framework.KubectlPath)
}

framework.Logf("Starting e2e run %q on Ginkgo node %d", framework.RunID, config.GinkgoConfig.ParallelNode)
ginkgo.RunSpecs(t, "nginx-ingress-controller e2e suite")
}

Expand Down
Loading