From a288318551e8bb2f3b0a8f7426625768d665b192 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Tue, 14 Feb 2023 00:22:16 +0000 Subject: [PATCH] Refactor syncNetworkEndpoints Add a waitgroup that waits for the completeness of all NEG oeprations. --- pkg/neg/syncers/transaction.go | 77 ++++++++++++++++++----------- pkg/neg/syncers/transaction_test.go | 62 ++++++++++++++++------- pkg/neg/types/sync_results.go | 17 ++++--- 3 files changed, 102 insertions(+), 54 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 15727b2495..7105415691 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -97,16 +97,18 @@ type transactionSyncer struct { logger klog.Logger - // inError indicates if the syncer is in any of 4 error scenarios + // errorState indicates if the syncer is in any of 4 error scenarios // 1. Endpoint counts from EPS is different from calculated endpoint list // 2. EndpontSlice has missing or invalid data // 3. Attach/Detach EP fails due to incorrect batch information // 4. Endpoint count from EPS or calculated endpoint list is 0 // Need to grab syncLock first for any reads or writes based on this value - inError bool + errorState string // syncCollector collect sync related metrics syncCollector metrics.SyncerMetricsCollector + + wg sync.WaitGroup } func NewTransactionSyncer( @@ -152,7 +154,7 @@ func NewTransactionSyncer( syncCollector: syncerMetrics, customName: customName, enableEndpointSlices: enableEndpointSlices, - inError: false, + errorState: "", logger: logger, } // Syncer implements life cycle logic @@ -267,8 +269,11 @@ func (s *transactionSyncer) syncInternalImpl() error { } endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices) targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) - if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) { - s.setErrorState() + if valid, reason := s.isValidEPField(err); !valid { + s.setErrorState(reason) + } + if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid { + s.setErrorState(reason) } if err != nil { return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err) @@ -327,17 +332,17 @@ func (s *transactionSyncer) syncInternalImpl() error { // syncLock must already be acquired before execution func (s *transactionSyncer) inErrorState() bool { - return s.inError + return s.errorState == "" } // syncLock must already be acquired before execution -func (s *transactionSyncer) setErrorState() { - s.inError = true +func (s *transactionSyncer) setErrorState(errorState string) { + s.errorState = errorState } // syncLock must already be acquired before execution func (s *transactionSyncer) resetErrorState() { - s.inError = false + s.errorState = "" } // ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones. @@ -382,18 +387,18 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { } // isValidEndpointInfo checks if endpoint information is correct. -// It returns false if one of the two checks fails: +// It returns false and the corresponding reason if one of the two checks fails: // // 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap: // endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed // and we compare the endpoint counts with duplicates // 2. The endpoint count from endpointData or the one from endpointPodMap is 0 -func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool { +func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, string) { // Endpoint count from EndpointPodMap countFromPodMap := len(endpointPodMap) + dupCount if countFromPodMap == 0 { s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap) - return false + return false, negtypes.ResultEPCalculationCountZero } // Endpoint count from EndpointData @@ -403,42 +408,47 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en } if countFromEndpointData == 0 { s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds) - return false + return false, negtypes.ResultEPSEndpointCountZero } if countFromEndpointData != countFromPodMap { s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount) - return false + return false, negtypes.ResultEPCountsDiffer } - return true + return true, negtypes.ResultSuccess } -// isValidEPField returns false if there is endpoint with missing zone or nodeName -func (s *transactionSyncer) isValidEPField(err error) bool { +// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName +func (s *transactionSyncer) isValidEPField(err error) (bool, string) { if errors.Is(err, ErrEPMissingNodeName) { s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err) - return false + return false, negtypes.ResultEPMissingNodeName } if errors.Is(err, ErrEPMissingZone) { s.logger.Info("Detected unexpected error when checking missing zone", "error", err) - return false + return false, negtypes.ResultEPMissingZone } - return true + return true, negtypes.ResultSuccess } -// isValidEPBatch returns false if the error from endpoint batch response is due to bad request -func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool { +// isValidEPBatch returns false and the corresponding reason if the error from endpoint batch response is due to bad request +func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) (bool, string) { apiErr, ok := err.(*googleapi.Error) if !ok { - s.logger.Info("Detected error when parsing batch request error", "operation", operation, "error", err) - return false + s.logger.Info("Detected error when parsing batch response error", "operation", operation, "error", err) + return false, negtypes.ResultInvalidAPIResponse } errCode := apiErr.Code if errCode == http.StatusBadRequest { s.logger.Info("Detected error when sending endpoint batch information", "operation", operation, "errorCode", errCode) - return false + if operation == attachOp { + return false, negtypes.ResultInvalidEPAttach + } + if operation == detachOp { + return false, negtypes.ResultInvalidEPDetach + } } - return true + return true, negtypes.ResultSuccess } // syncNetworkEndpoints spins off go routines to execute NEG operations @@ -482,18 +492,26 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m if err := syncFunc(removeEndpoints, detachOp); err != nil { return err } + go s.collectSyncResult() return nil } +// collectSyncResult collects the result of the sync and emits the metrics for sync result +func (s *transactionSyncer) collectSyncResult() { + s.wg.Wait() +} + // attachNetworkEndpoints creates go routine to run operations for attaching network endpoints func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) + s.wg.Add(1) go s.operationInternal(attachOp, zone, networkEndpointMap) } // detachNetworkEndpoints creates go routine to run operations for detaching network endpoints func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone) + s.wg.Add(1) go s.operationInternal(detachOp, zone, networkEndpointMap) } @@ -501,6 +519,7 @@ func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointM // It will record events when operations are completed // If error occurs or any transaction entry requires reconciliation, it will trigger resync func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) { + defer s.wg.Done() var err error start := time.Now() networkEndpoints := []*composite.NetworkEndpoint{} @@ -519,8 +538,10 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone)) } else { s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err)) - if !s.isValidEPBatch(err, operation, networkEndpoints) { - s.setErrorState() + if valid, reason := s.isValidEPBatch(err, operation, networkEndpoints); !valid { + s.syncLock.Lock() + defer s.syncLock.Unlock() + s.setErrorState(reason) } } diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index b7063ca475..bf58e56ce9 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -18,6 +18,7 @@ package syncers import ( context2 "context" + "errors" "fmt" "net" "net/http" @@ -1474,6 +1475,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap map[negtypes.NetworkEndpoint]types.NamespacedName dupCount int expect bool + expectedReason string }{ { desc: "counts equal, endpointData has no duplicated endpoints", @@ -1546,6 +1548,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: testEndpointPodMap, dupCount: 0, expect: true, + expectedReason: negtypes.ResultSuccess, }, { desc: "counts equal, endpointData has duplicated endpoints", @@ -1627,6 +1630,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: testEndpointPodMap, dupCount: 1, expect: true, + expectedReason: negtypes.ResultSuccess, }, { desc: "counts not equal, endpointData has no duplicated endpoints", @@ -1690,6 +1694,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: testEndpointPodMap, dupCount: 0, expect: false, + expectedReason: negtypes.ResultEPCountsDiffer, }, { desc: "counts not equal, endpointData has duplicated endpoints", @@ -1762,6 +1767,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: testEndpointPodMap, dupCount: 1, expect: false, + expectedReason: negtypes.ResultEPCountsDiffer, }, { desc: "endpointData has zero endpoint", @@ -1796,6 +1802,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: testEndpointPodMap, dupCount: 0, expect: false, + expectedReason: negtypes.ResultEPSEndpointCountZero, }, { desc: "endpointPodMap has zero endpoint", @@ -1868,6 +1875,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{}, dupCount: 0, expect: false, + expectedReason: negtypes.ResultEPCalculationCountZero, }, { desc: "endpointData and endpointPodMap both have zero endpoint", @@ -1902,6 +1910,7 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{}, dupCount: 0, expect: false, + expectedReason: negtypes.ResultEPCalculationCountZero, }, { desc: "endpointData and endpointPodMap both have non-zero endpoints", @@ -1974,12 +1983,13 @@ func TestIsValidEndpointInfo(t *testing.T) { endpointPodMap: testEndpointPodMap, dupCount: 0, expect: true, + expectedReason: negtypes.ResultSuccess, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - if got := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect { + if got, reason := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect && reason != tc.expectedReason { t.Errorf("invalidEndpointInfo() = %t, expected %t", got, tc.expect) } }) @@ -2013,9 +2023,10 @@ func TestIsValidEPField(t *testing.T) { } testCases := []struct { - desc string - endpointsData []negtypes.EndpointsData - expect bool + desc string + endpointsData []negtypes.EndpointsData + expect bool + expectedReason string }{ { desc: "no missing zone or nodeName", @@ -2085,7 +2096,8 @@ func TestIsValidEPField(t *testing.T) { }, }, }, - expect: true, + expect: true, + expectedReason: negtypes.ResultSuccess, }, { desc: "contains one missing nodeName", @@ -2155,7 +2167,8 @@ func TestIsValidEPField(t *testing.T) { }, }, }, - expect: false, + expect: false, + expectedReason: negtypes.ResultEPMissingNodeName, }, { desc: "contains one empty nodeName", @@ -2225,7 +2238,8 @@ func TestIsValidEPField(t *testing.T) { }, }, }, - expect: false, + expect: false, + expectedReason: negtypes.ResultEPMissingNodeName, }, { desc: "contains one missing zone", @@ -2295,13 +2309,14 @@ func TestIsValidEPField(t *testing.T) { }, }, }, - expect: false, + expect: false, + expectedReason: negtypes.ResultEPMissingZone, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { _, _, _, err := transactionSyncer.endpointsCalculator.CalculateEndpoints(tc.endpointsData, nil) - if got := transactionSyncer.isValidEPField(err); got != tc.expect { + if got, reason := transactionSyncer.isValidEPField(err); got != tc.expect && reason != tc.expectedReason { t.Errorf("isValidEPField() = %t, expected %t, err: %v, ", got, tc.expect, err) } }) @@ -2316,18 +2331,31 @@ func TestIsValidEPBatch(t *testing.T) { testCases := []struct { desc string - HttpStatusCode int + APIResponse error expect bool + expectedReason string }{ { - desc: "NEG API call server error, status code 500", - HttpStatusCode: http.StatusOK, + desc: "NEG API call server error, status code 500", + APIResponse: &googleapi.Error{ + Code: http.StatusOK, + }, expect: true, + expectedReason: negtypes.ResultSuccess, + }, + { + desc: "NEG API call request error, status code 400", + APIResponse: &googleapi.Error{ + Code: http.StatusBadRequest, + }, + expect: false, + expectedReason: negtypes.ResultInvalidEPAttach, }, { - desc: "NEG API call request error, status code 400", - HttpStatusCode: http.StatusBadRequest, + desc: "NEG API call invalid response", + APIResponse: errors.New("non googleapi error"), expect: false, + expectedReason: negtypes.ResultInvalidAPIResponse, }, } @@ -2335,14 +2363,12 @@ func TestIsValidEPBatch(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { mockGCE := fakeGCE.Compute().(*cloud.MockGCE) mockGCE.MockNetworkEndpointGroups.AttachNetworkEndpointsHook = func(ctx context2.Context, key *meta.Key, arg0 *compute.NetworkEndpointGroupsAttachEndpointsRequest, neg *cloud.MockNetworkEndpointGroups) error { - return &googleapi.Error{ - Code: tc.HttpStatusCode, - } + return tc.APIResponse } _, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, true) err := transactionSyncer.cloud.AttachNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, networkEndpoints, transactionSyncer.NegSyncerKey.GetAPIVersion()) - if got := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect { + if got, reason := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect && reason != tc.expectedReason { t.Errorf("isInvalidEPBatch() = %t, expected %t", got, tc.expect) } }) diff --git a/pkg/neg/types/sync_results.go b/pkg/neg/types/sync_results.go index 3fb97033d5..5253fa4bfe 100644 --- a/pkg/neg/types/sync_results.go +++ b/pkg/neg/types/sync_results.go @@ -35,14 +35,15 @@ var ( ErrEPCalculationCountZero = errors.New("endpoint count from endpointPodMap cannot be zero") // these results have their own errors - ResultInvalidEPAttach = "InvalidEPAttach" - ResultInvalidEPDetach = "InvalidEPDetach" - ResultNegNotFound = "NegNotFound" - ResultCurrentEPNotFound = "CurrentEPNotFound" - ResultEPSNotFound = "EPSNotFound" - ResultOtherError = "OtherError" - ResultInProgress = "InProgress" - ResultSuccess = "Success" + ResultInvalidAPIResponse = "InvalidAPIResponse" + ResultInvalidEPAttach = "InvalidEPAttach" + ResultInvalidEPDetach = "InvalidEPDetach" + ResultNegNotFound = "NegNotFound" + ResultCurrentEPNotFound = "CurrentEPNotFound" + ResultEPSNotFound = "EPSNotFound" + ResultOtherError = "OtherError" + ResultInProgress = "InProgress" + ResultSuccess = "Success" ) type NegSyncResult struct {