Skip to content

Commit

Permalink
Fix data race updating ingress status (#1872)
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf authored Jan 2, 2018
1 parent da82974 commit a09527c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 22 deletions.
2 changes: 1 addition & 1 deletion internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (n *NGINXController) Start() {
}

if n.syncStatus != nil {
go n.syncStatus.Run(n.stopCh)
go n.syncStatus.Run()
}

go wait.Until(n.checkMissingSecrets, 30*time.Second, n.stopCh)
Expand Down
25 changes: 9 additions & 16 deletions internal/ingress/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (

// Sync ...
type Sync interface {
Run(stopCh <-chan struct{})
Run()
Shutdown()
}

Expand Down Expand Up @@ -91,16 +91,8 @@ type statusSync struct {
}

// Run starts the loop to keep the status in sync
func (s statusSync) Run(stopCh <-chan struct{}) {
go s.elector.Run()
go wait.Forever(s.update, updateInterval)
go s.syncQueue.Run(time.Second, stopCh)
<-stopCh
}

func (s *statusSync) update() {
// send a dummy object to the queue to force a sync
s.syncQueue.Enqueue("sync status")
func (s statusSync) Run() {
s.elector.Run()
}

// Shutdown stop the sync. In case the instance is the leader it will remove the current IP
Expand Down Expand Up @@ -146,11 +138,6 @@ func (s *statusSync) sync(key interface{}) error {
return nil
}

if !s.elector.IsLeader() {
glog.V(2).Infof("skipping Ingress status update (I am not the current leader)")
return nil
}

addrs, err := s.runningAddresses()
if err != nil {
return err
Expand Down Expand Up @@ -188,6 +175,12 @@ func NewStatusSyncer(config Config) Sync {
callbacks := leaderelection.LeaderCallbacks{
OnStartedLeading: func(stop <-chan struct{}) {
glog.V(2).Infof("I am the new status update leader")
go st.syncQueue.Run(time.Second, stop)
wait.PollUntil(updateInterval, func() (bool, error) {
// send a dummy object to the queue to force a sync
st.syncQueue.Enqueue("sync status")
return false, nil
}, stop)
},
OnStoppedLeading: func() {
glog.V(2).Infof("I am not status update leader anymore")
Expand Down
9 changes: 4 additions & 5 deletions internal/ingress/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,8 @@ func TestStatusActions(t *testing.T) {

fk := fkSync.(statusSync)

ns := make(chan struct{})
// start it and wait for the election and syn actions
go fk.Run(ns)
go fk.Run()
// wait for the election
time.Sleep(100 * time.Millisecond)
// execute sync
Expand All @@ -294,6 +293,8 @@ func TestStatusActions(t *testing.T) {
t.Fatalf("returned %v but expected %v", fooIngress1CurIPs, newIPs)
}

time.Sleep(1 * time.Second)

// execute shutdown
fk.Shutdown()
// ingress should be empty
Expand All @@ -314,9 +315,6 @@ func TestStatusActions(t *testing.T) {
if oic.Status.LoadBalancer.Ingress[0].IP != "0.0.0.0" && oic.Status.LoadBalancer.Ingress[0].Hostname != "foo.bar.com" {
t.Fatalf("invalid ingress status for rule with different class")
}

// end test
ns <- struct{}{}
}

func TestCallback(t *testing.T) {
Expand All @@ -325,6 +323,7 @@ func TestCallback(t *testing.T) {

func TestKeyfunc(t *testing.T) {
fk := buildStatusSync()

i := "foo_base_pod"
r, err := fk.keyfunc(i)

Expand Down

0 comments on commit a09527c

Please sign in to comment.