Skip to content

Commit

Permalink
Fix creation of NEGs in the new zone when cluster spans to the new zone
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Aug 26, 2022
1 parent cfcb825 commit 0b294be
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 79 deletions.
22 changes: 11 additions & 11 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func NewController(
})
}

nodeEventHandler := cache.ResourceEventHandlerFuncs{
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
Expand All @@ -301,20 +301,20 @@ func NewController(
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
},
}

if negController.runL4 {
nodeEventHandler.UpdateFunc = func(old, cur interface{}) {
UpdateFunc: func(old, cur interface{}) {
oldNode := old.(*apiv1.Node)
currentNode := cur.(*apiv1.Node)
candidateNodeCheck := utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
if candidateNodeCheck(oldNode) != candidateNodeCheck(currentNode) {
logger.Info("Node has changed, enqueueing", "node", klog.KObj(currentNode))

vmIpCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType)
vmIpPortCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType)

if vmIpCandidateNodeCheck(oldNode) != vmIpCandidateNodeCheck(currentNode) ||
vmIpPortCandidateNodeCheck(oldNode) != vmIpPortCandidateNodeCheck(currentNode) {
logger.Info("Node has changed, enqueueing", "node", currentNode.Name)
negController.enqueueNode(currentNode)
}
}
}
nodeInformer.AddEventHandler(nodeEventHandler)
},
})

if enableAsm {
negController.enableASM = enableAsm
Expand Down
152 changes: 85 additions & 67 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ type syncerManager struct {
enableNonGcpMode bool
enableEndpointSlices bool

// zoneMap keeps track of the last set of zones the neg controller
// has seen. zoneMap is protected by the mu mutex.
zoneMap map[string]struct{}

logger klog.Logger

// zone maps keep track of the last set of zones the neg controller has seen
// for their respective NEG types. zone maps are protected by the mu mutex.
vmIpZoneMap map[string]struct{}
vmIpPortZoneMap map[string]struct{}
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
Expand All @@ -117,14 +118,10 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
enableEndpointSlices bool,
logger klog.Logger) *syncerManager {

zones, err := zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
logger.V(3).Info("Unable to initialize zone map in neg manager", "err", err)
}
zoneMap := make(map[string]struct{})
for _, zone := range zones {
zoneMap[zone] = struct{}{}
}
var vmIpZoneMap, vmIpPortZoneMap map[string]struct{}
updateZoneMap(&vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), zoneGetter, logger)
updateZoneMap(&vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), zoneGetter, logger)

return &syncerManager{
namer: namer,
recorder: recorder,
Expand All @@ -142,8 +139,9 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
kubeSystemUID: kubeSystemUID,
enableNonGcpMode: enableNonGcpMode,
enableEndpointSlices: enableEndpointSlices,
zoneMap: zoneMap,
logger: logger,
vmIpZoneMap: vmIpZoneMap,
vmIpPortZoneMap: vmIpPortZoneMap,
}
}

Expand Down Expand Up @@ -289,21 +287,40 @@ func (manager *syncerManager) SyncNodes() {
defer manager.mu.Unlock()

// When a zone change occurs (new zone is added or deleted), a sync should be triggered
isZoneChange := manager.updateZoneMap()
isVmIpZoneChange := updateZoneMap(&manager.vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), manager.zoneGetter, manager.logger)
isVmIpPortZoneChange := updateZoneMap(&manager.vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), manager.zoneGetter, manager.logger)

for key, syncer := range manager.syncerMap {
needSync := isZoneChange || key.NegType == negtypes.VmIpEndpointType
if needSync && !syncer.IsStopped() {
syncer.Sync()
if syncer.IsStopped() {
continue
}

switch key.NegType {

case negtypes.VmIpEndpointType:
if isVmIpZoneChange {
syncer.Sync()
}

case negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType:
if isVmIpPortZoneChange {
syncer.Sync()
}

default:
manager.logger.Error(nil, "Not triggering sync for syncer of unknown type", "syncerType", key.NegType)
}
}
}

// updateZoneMap updates the manager's zone map with the current zones and returns true if the
// zones have changed. The caller must obtain mu mutex before calling this function
func (manager *syncerManager) updateZoneMap() bool {
zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate)
// updateZoneMap updates the existingZoneMap with the latest zones and returns
// true if the zones have changed. The caller must obtain mu mutex of the
// manager before calling this function since it modifies the passed
// existingZoneMap.
func updateZoneMap(existingZoneMap *map[string]struct{}, candidateNodePredicate utils.NodeConditionPredicate, zoneGetter negtypes.ZoneGetter, logger klog.Logger) bool {
zones, err := zoneGetter.ListZones(candidateNodePredicate)
if err != nil {
manager.logger.Error(err, "Unable to list zones")
logger.Error(err, "Unable to list zones")
return false
}

Expand All @@ -312,8 +329,9 @@ func (manager *syncerManager) updateZoneMap() bool {
newZoneMap[zone] = struct{}{}
}

zoneChange := !reflect.DeepEqual(manager.zoneMap, newZoneMap)
manager.zoneMap = newZoneMap
zoneChange := !reflect.DeepEqual(*existingZoneMap, newZoneMap)
*existingZoneMap = newZoneMap

return zoneChange
}

Expand Down Expand Up @@ -684,6 +702,49 @@ func (manager *syncerManager) ensureSvcNegCR(svcKey serviceKey, portInfo negtype
return nil
}

// getSyncerKey encodes a service namespace, name, service port and targetPort into a string key
func (manager *syncerManager) getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey, portInfo negtypes.PortInfo) negtypes.NegSyncerKey {
networkEndpointType := negtypes.VmIpPortEndpointType
calculatorMode := negtypes.L7Mode
if manager.enableNonGcpMode {
networkEndpointType = negtypes.NonGCPPrivateEndpointType
}
if portInfo.PortTuple.Empty() {
networkEndpointType = negtypes.VmIpEndpointType
calculatorMode = portInfo.EpCalculatorMode
}

return negtypes.NegSyncerKey{
Namespace: namespace,
Name: name,
NegName: portInfo.NegName,
PortTuple: portInfo.PortTuple,
Subset: servicePortKey.Subset,
SubsetLabels: portInfo.SubsetLabels,
NegType: networkEndpointType,
EpCalculatorMode: calculatorMode,
}
}

// removeCommonPorts removes duplicate ports in p1 and p2 if the corresponding port info is converted to the same syncerKey.
// When both ports can be converted to the same syncerKey, that means the underlying NEG syncer and NEG configuration is exactly the same.
// For example, this function effectively removes duplicate port with different readiness gate flag if the rest of the field in port info is the same.
func (manager *syncerManager) removeCommonPorts(p1, p2 negtypes.PortInfoMap) {
for port, portInfo1 := range p1 {
portInfo2, ok := p2[port]
if !ok {
continue
}

syncerKey1 := manager.getSyncerKey("", "", port, portInfo1)
syncerKey2 := manager.getSyncerKey("", "", port, portInfo2)
if reflect.DeepEqual(syncerKey1, syncerKey2) {
delete(p1, port)
delete(p2, port)
}
}
}

func ensureNegCRLabels(negCR *negv1beta1.ServiceNetworkEndpointGroup, labels map[string]string, logger klog.Logger) (bool, error) {
needsUpdate := false
existingLabels := negCR.GetLabels()
Expand Down Expand Up @@ -743,52 +804,9 @@ func patchNegStatus(svcNegClient svcnegclient.Interface, oldNeg, newNeg negv1bet
return svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(oldNeg.Namespace).Patch(context.Background(), oldNeg.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
}

// getSyncerKey encodes a service namespace, name, service port and targetPort into a string key
func (manager *syncerManager) getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey, portInfo negtypes.PortInfo) negtypes.NegSyncerKey {
networkEndpointType := negtypes.VmIpPortEndpointType
calculatorMode := negtypes.L7Mode
if manager.enableNonGcpMode {
networkEndpointType = negtypes.NonGCPPrivateEndpointType
}
if portInfo.PortTuple.Empty() {
networkEndpointType = negtypes.VmIpEndpointType
calculatorMode = portInfo.EpCalculatorMode
}

return negtypes.NegSyncerKey{
Namespace: namespace,
Name: name,
NegName: portInfo.NegName,
PortTuple: portInfo.PortTuple,
Subset: servicePortKey.Subset,
SubsetLabels: portInfo.SubsetLabels,
NegType: networkEndpointType,
EpCalculatorMode: calculatorMode,
}
}

func getServiceKey(namespace, name string) serviceKey {
return serviceKey{
namespace: namespace,
name: name,
}
}

// removeCommonPorts removes duplicate ports in p1 and p2 if the corresponding port info is converted to the same syncerKey.
// When both ports can be converted to the same syncerKey, that means the underlying NEG syncer and NEG configuration is exactly the same.
// For example, this function effectively removes duplicate port with different readiness gate flag if the rest of the field in port info is the same.
func (manager *syncerManager) removeCommonPorts(p1, p2 negtypes.PortInfoMap) {
for port, portInfo1 := range p1 {
portInfo2, ok := p2[port]
if !ok {
continue
}

syncerKey1 := manager.getSyncerKey("", "", port, portInfo1)
syncerKey2 := manager.getSyncerKey("", "", port, portInfo2)
if reflect.DeepEqual(syncerKey1, syncerKey2) {
delete(p1, port)
delete(p2, port)
}
}
}
2 changes: 1 addition & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ func TestSyncNodesConditions(t *testing.T) {
},
{
desc: "vm ip neg, zones are the same",
expectSync: true,
expectSync: false,
negType: negtypes.VmIpEndpointType,
},
{
Expand Down
8 changes: 8 additions & 0 deletions pkg/neg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,14 @@ func EndpointsDataFromEndpointSlices(slices []*discovery.EndpointSlice) []Endpoi
func NodePredicateForEndpointCalculatorMode(mode EndpointsCalculatorMode) utils.NodeConditionPredicate {
// VM_IP NEGs can include unready and upgrading nodes.
if mode == L4ClusterMode || mode == L4LocalMode {
return NodePredicateForNetworkEndpointType(VmIpEndpointType)
}
return NodePredicateForNetworkEndpointType(VmIpPortEndpointType)
}

// NodePredicateForNetworkEndpointType returns the predicate function to select candidate nodes, given the NEG type.
func NodePredicateForNetworkEndpointType(negType NetworkEndpointType) utils.NodeConditionPredicate {
if negType == VmIpEndpointType {
return utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
}
return utils.CandidateNodesPredicate
Expand Down

0 comments on commit 0b294be

Please sign in to comment.