From 2e20cef60808dc0f763a585f9ee6e55a17812a70 Mon Sep 17 00:00:00 2001
From: Bowei Du <bowei@google.com>
Date: Wed, 20 May 2020 10:13:52 -0700
Subject: [PATCH] Wait for caches to sync before running node sync

There can be a race condition where the queue somehow triggers
before the caches are able to sync.

Improve logging around the event for better debugging in the future.
---
 pkg/controller/node.go     | 16 +++++++++++++++-
 pkg/instances/instances.go | 22 ++++++++++++++++------
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/pkg/controller/node.go b/pkg/controller/node.go
index 77b80a471a..52b4a05a3f 100644
--- a/pkg/controller/node.go
+++ b/pkg/controller/node.go
@@ -17,12 +17,15 @@ limitations under the License.
 package controller
 
 import (
+	"time"
+
 	apiv1 "k8s.io/api/core/v1"
 	listers "k8s.io/client-go/listers/core/v1"
 	"k8s.io/client-go/tools/cache"
 	"k8s.io/ingress-gce/pkg/context"
 	"k8s.io/ingress-gce/pkg/instances"
 	"k8s.io/ingress-gce/pkg/utils"
+	"k8s.io/klog"
 )
 
 // NodeController synchronizes the state of the nodes to the unmanaged instance
@@ -34,6 +37,9 @@ type NodeController struct {
 	queue utils.TaskQueue
 	// instancePool is a NodePool to manage kubernetes nodes.
 	instancePool instances.NodePool
+	// hasSynced returns true if relevant caches have done their initial
+	// synchronization.
+	hasSynced func() bool
 }
 
 // NewNodeController returns a new node update controller.
@@ -41,6 +47,7 @@ func NewNodeController(ctx *context.ControllerContext, instancePool instances.No
 	c := &NodeController{
 		lister:       ctx.NodeInformer.GetIndexer(),
 		instancePool: instancePool,
+		hasSynced:    ctx.HasSynced,
 	}
 	c.queue = utils.NewPeriodicTaskQueue("", "nodes", c.sync)
 
@@ -60,8 +67,15 @@ func NewNodeController(ctx *context.ControllerContext, instancePool instances.No
 	return c
 }
 
-// Run a goroutine to process updates for the controller.
+// Run the queue to process updates for the controller. This must be run in a
+// separate goroutine (method will block until queue shutdown).
 func (c *NodeController) Run() {
+	start := time.Now()
+	for !c.hasSynced() {
+		klog.V(2).Infof("Waiting for hasSynced (%s elapsed)", time.Now().Sub(start))
+		time.Sleep(1 * time.Second)
+	}
+	klog.V(2).Infof("Caches synced (took %s)", time.Now().Sub(start))
 	c.queue.Run()
 }
 
diff --git a/pkg/instances/instances.go b/pkg/instances/instances.go
index aaf7e30e0e..324aba0ec1 100644
--- a/pkg/instances/instances.go
+++ b/pkg/instances/instances.go
@@ -19,6 +19,7 @@ package instances
 import (
 	"fmt"
 	"net/http"
+	"time"
 
 	"k8s.io/ingress-gce/pkg/utils/namer"
 	"k8s.io/klog"
@@ -277,9 +278,9 @@ func (i *Instances) Remove(groupName string, names []string) error {
 	return fmt.Errorf("%v", errs)
 }
 
-// Sync syncs kubernetes instances with the instances in the instance group.
+// Sync nodes with the instances in the instance group.
 func (i *Instances) Sync(nodes []string) (err error) {
-	klog.V(4).Infof("Syncing nodes %v", nodes)
+	klog.V(2).Infof("Syncing nodes %v", nodes)
 
 	defer func() {
 		// The node pool is only responsible for syncing nodes to instance
@@ -296,6 +297,7 @@ func (i *Instances) Sync(nodes []string) (err error) {
 
 	pool, err := i.List()
 	if err != nil {
+		klog.Errorf("List error: %v", err)
 		return err
 	}
 
@@ -303,6 +305,7 @@ func (i *Instances) Sync(nodes []string) (err error) {
 		gceNodes := sets.NewString()
 		gceNodes, err = i.list(igName)
 		if err != nil {
+			klog.Errorf("list(%q) error: %v", igName, err)
 			return err
 		}
 		kubeNodes := sets.NewString(nodes...)
@@ -313,16 +316,23 @@ func (i *Instances) Sync(nodes []string) (err error) {
 
 		removeNodes := gceNodes.Difference(kubeNodes).List()
 		addNodes := kubeNodes.Difference(gceNodes).List()
+
+		klog.V(2).Infof("Removing %d, adding %d nodes", len(removeNodes), len(addNodes))
+
+		start := time.Now()
 		if len(removeNodes) != 0 {
-			klog.V(4).Infof("Removing nodes from IG: %v", removeNodes)
-			if err = i.Remove(igName, removeNodes); err != nil {
+			err = i.Remove(igName, removeNodes)
+			klog.V(2).Infof("Remove(%q, _) = %v (took %s); nodes = %v", igName, err, time.Now().Sub(start), removeNodes)
+			if err != nil {
 				return err
 			}
 		}
 
+		start = time.Now()
 		if len(addNodes) != 0 {
-			klog.V(4).Infof("Adding nodes to IG: %v", addNodes)
-			if err = i.Add(igName, addNodes); err != nil {
+			err = i.Add(igName, addNodes)
+			klog.V(2).Infof("Add(%q, _) = %v (took %s); nodes = %v", igName, err, time.Now().Sub(start), addNodes)
+			if err != nil {
 				return err
 			}
 		}