diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 2fe7bb563c..97db3fb1df 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -113,6 +113,7 @@ var ( EnableL4ILBDualStack bool EnableL4NetLBDualStack bool EnableMultipleIGs bool + EnableDegradedMode bool MaxIGSize int }{ GCERateLimitScale: 1.0, @@ -260,6 +261,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 flag.IntVar(&F.MaxIGSize, "max-ig-size", 1000, "Max number of instances in Instance Group") flag.DurationVar(&F.MetricsExportInterval, "metrics-export-interval", 10*time.Minute, `Period for calculating and exporting metrics related to state of managed objects.`) flag.DurationVar(&F.NegMetricsExportInterval, "neg-metrics-export-interval", 5*time.Second, `Period for calculating and exporting internal neg controller metrics, not usage.`) + flag.BoolVar(&F.EnableDegradedMode, "enable-degraded-mode", false, `Enable endpoint calculation using degraded mode`) + } type RateLimitSpecs struct { diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 46db61b1fd..6cf8f394ca 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "net/http" + "strconv" "strings" "sync" "time" @@ -104,6 +105,9 @@ type transactionSyncer struct { // syncCollector collect sync related metrics syncCollector metrics.SyncerMetricsCollector + + // enableDegradedMode indicates whether we do endpoint calculations in degraded mode + enableDegradedMode bool } func NewTransactionSyncer( @@ -260,9 +264,16 @@ func (s *transactionSyncer) syncInternalImpl() *negtypes.NegSyncResult { s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness) } - targetMap, endpointPodMap, result = s.computeTargetMap(endpointSlices) - if result != nil { - return result + if s.enableDegradedMode { + targetMap, endpointPodMap, result = s.computeTargetMapDegradedMode(endpointSlices, s.NegSyncerKey.PortTuple.Name, s.NegSyncerKey.NegType) + if result != nil { + return result + } + } else { + targetMap, endpointPodMap, result = s.computeTargetMap(endpointSlices) + if result != nil { + return result + } } s.logStats(targetMap, "desired NEG endpoints") @@ -316,6 +327,43 @@ func (s *transactionSyncer) computeTargetMap(endpointSlices []*discovery.Endpoin return targetMap, endpointPodMap, result } +// computeTargetMapDegradedMode does endpoint calculation differently +// to fix the error in NEG controller and return the desire map +func (s *transactionSyncer) computeTargetMapDegradedMode(slices []*discovery.EndpointSlice, servicePortName string, endpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, *negtypes.NegSyncResult) { + var targetMap map[string]negtypes.NetworkEndpointSet + var endpointPodMap negtypes.EndpointPodMap + + for _, slice := range slices { + // if it is a custom endpoint slice, don't include it + // so it won't block the neg api call from succeeding + if val, ok := slice.ObjectMeta.Labels[discovery.LabelManagedBy]; !ok || val != "endpointslice-controller.k8s.io" { + continue + } + matchPort := "" + for _, port := range slice.Ports { + if *port.Name == servicePortName { + matchPort = strconv.Itoa(int(*port.Port)) + break + } + } + if len(matchPort) == 0 { + continue + } + + for _, ep := range slice.Endpoints { + if ep.TargetRef == nil { + continue + } + s.validateAndAddEndpoints(ep, matchPort, endpointType, targetMap, endpointPodMap) + } + } + return targetMap, endpointPodMap, nil +} + +// validateAndAddEndpoints fills in missing information and creates network endpoint for each endpoint addresss +func (s *transactionSyncer) validateAndAddEndpoints(ep discovery.Endpoint, matchPort string, endpointType negtypes.NetworkEndpointType, targetMap map[string]negtypes.NetworkEndpointSet, endpointPodMap negtypes.EndpointPodMap) { +} + // syncLock must already be acquired before execution func (s *transactionSyncer) inErrorState() bool { return s.errorState != ""