Skip to content

Commit

Permalink
prevent processing updates leading to sync when controller doing full…
Browse files Browse the repository at this point in the history
… sync at boot time (cloudnativelabs#400)
  • Loading branch information
murali-reddy authored Apr 18, 2018
1 parent 041c055 commit a1ecedf
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
24 changes: 20 additions & 4 deletions pkg/controllers/network_policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type NetworkPolicyController struct {
syncPeriod time.Duration
MetricsEnabled bool
v1NetworkPolicy bool
readyForUpdates bool

// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
Expand Down Expand Up @@ -137,7 +138,7 @@ func (npc *NetworkPolicyController) Run(healthChan chan<- *ControllerHeartbeat,
} else {
sendHeartBeat(healthChan, "NPC")
}

npc.readyForUpdates = true
select {
case <-stopCh:
glog.Infof("Shutting down network policies controller")
Expand All @@ -152,6 +153,11 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
pod := obj.(*api.Pod)
glog.V(2).Infof("Received update to pod: %s/%s", pod.Namespace, pod.Name)

if !npc.readyForUpdates {
glog.V(3).Infof("Skipping update to pod: %s/%s, controller still performing bootup full-sync", pod.Namespace, pod.Name)
return
}

err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing network policy for the update to pod: %s/%s Error: %s", pod.Namespace, pod.Name, err)
Expand All @@ -162,6 +168,12 @@ func (npc *NetworkPolicyController) OnPodUpdate(obj interface{}) {
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(obj interface{}) {
netpol := obj.(*networking.NetworkPolicy)
glog.V(2).Infof("Received update for network policy: %s/%s", netpol.Namespace, netpol.Name)

if !npc.readyForUpdates {
glog.V(3).Infof("Skipping update to network policy: %s/%s, controller still performing bootup full-sync", netpol.Namespace, netpol.Name)
return
}

err := npc.Sync()
if err != nil {
glog.Errorf("Error syncing network policy for the update to network policy: %s/%s Error: %s", netpol.Namespace, netpol.Name, err)
Expand Down Expand Up @@ -196,7 +208,7 @@ func (npc *NetworkPolicyController) Sync() error {
if npc.MetricsEnabled {
controllerIptablesSyncTime.WithLabelValues().Set(float64(endTime))
}
glog.V(2).Infof("sync iptables took %v", endTime)
glog.V(1).Infof("sync iptables took %v", endTime)
}()

glog.V(1).Info("Starting periodic sync of iptables")
Expand Down Expand Up @@ -1414,8 +1426,12 @@ func (npc *NetworkPolicyController) newPodEventHandler() cache.ResourceEventHand

},
UpdateFunc: func(oldObj, newObj interface{}) {
npc.OnPodUpdate(newObj)

newPoObj := newObj.(*api.Pod)
oldPoObj := oldObj.(*api.Pod)
if newPoObj.Status.Phase != oldPoObj.Status.Phase || newPoObj.Status.PodIP != oldPoObj.Status.PodIP {
// for the network policies, we are only interested in pod status phase change or IP change
npc.OnPodUpdate(newObj)
}
},
DeleteFunc: func(obj interface{}) {
npc.OnPodUpdate(obj)
Expand Down
19 changes: 11 additions & 8 deletions pkg/controllers/network_routes_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,17 +1429,18 @@ func (nrc *NetworkRoutingController) OnNodeUpdate(obj interface{}) {
}

func (nrc *NetworkRoutingController) OnServiceUpdate(obj interface{}) {
if !nrc.bgpServerStarted {
return
}

svc, ok := obj.(*v1core.Service)
if !ok {
glog.Errorf("cache indexer returned obj that is not type *v1.Service")
return
}

glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name)
if !nrc.bgpServerStarted {
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
return
}

toAdvertise, toWithdraw, err := nrc.getVIPsForService(svc, true)
if err != nil {
glog.Errorf("error getting routes for service: %s, err: %s", svc.Name, err)
Expand Down Expand Up @@ -1483,10 +1484,6 @@ func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) {
}

func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
if !nrc.bgpServerStarted {
return
}

ep, ok := obj.(*v1core.Endpoints)
if !ok {
glog.Errorf("cache indexer returned obj that is not type *v1.Endpoints")
Expand All @@ -1497,6 +1494,12 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) {
return
}

glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name)
if !nrc.bgpServerStarted {
glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name)
return
}

svc, err := nrc.serviceForEndpoints(ep)
if err != nil {
glog.Errorf("failed to convert endpoints resource to service: %s", err)
Expand Down
21 changes: 14 additions & 7 deletions pkg/controllers/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type NetworkServicesController struct {
nodeportBindOnAllIp bool
MetricsEnabled bool
ln LinuxNetworking
readyForUpdates bool

svcLister cache.Indexer
epLister cache.Indexer
Expand Down Expand Up @@ -246,7 +247,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *ControllerHeartbeat
} else {
sendHeartBeat(healthChan, "NSC")
}

nsc.readyForUpdates = true
select {
case <-stopCh:
glog.Info("Shutting down network services controller")
Expand Down Expand Up @@ -350,9 +351,6 @@ func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoM

// OnEndpointsUpdate handle change in endpoints update from the API server
func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
nsc.mu.Lock()
defer nsc.mu.Unlock()

ep, ok := obj.(*api.Endpoints)
if !ok {
glog.Error("could not convert endpoints update object to *v1.Endpoints")
Expand All @@ -364,6 +362,12 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {
}

glog.V(1).Infof("Received update to endpoint: %s/%s from watch API", ep.Namespace, ep.Name)
if !nsc.readyForUpdates {
glog.V(3).Infof("Skipping update to endpoint: %s/%s, controller still performing bootup full-sync", ep.Namespace, ep.Name)
return
}
nsc.mu.Lock()
defer nsc.mu.Unlock()

// build new service and endpoints map to reflect the change
newServiceMap := nsc.buildServicesInfo()
Expand All @@ -381,16 +385,19 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(obj interface{}) {

// OnServiceUpdate handle change in service update from the API server
func (nsc *NetworkServicesController) OnServiceUpdate(obj interface{}) {
nsc.mu.Lock()
defer nsc.mu.Unlock()

svc, ok := obj.(*api.Service)
if !ok {
glog.Error("could not convert service update object to *v1.Service")
return
}

glog.V(1).Infof("Received update to service: %s/%s from watch API", svc.Namespace, svc.Name)
if !nsc.readyForUpdates {
glog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", svc.Namespace, svc.Name)
return
}
nsc.mu.Lock()
defer nsc.mu.Unlock()

// build new service and endpoints map to reflect the change
newServiceMap := nsc.buildServicesInfo()
Expand Down

0 comments on commit a1ecedf

Please sign in to comment.