Skip to content

Commit

Permalink
Refactor syncNetworkEndpoints
Browse files Browse the repository at this point in the history
Add a waitgroup that waits for the completeness of all NEG
oeprations.
  • Loading branch information
sawsa307 committed Feb 16, 2023
1 parent e8e35b5 commit a288318
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 54 deletions.
77 changes: 49 additions & 28 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,18 @@ type transactionSyncer struct {

logger klog.Logger

// inError indicates if the syncer is in any of 4 error scenarios
// errorState indicates if the syncer is in any of 4 error scenarios
// 1. Endpoint counts from EPS is different from calculated endpoint list
// 2. EndpontSlice has missing or invalid data
// 3. Attach/Detach EP fails due to incorrect batch information
// 4. Endpoint count from EPS or calculated endpoint list is 0
// Need to grab syncLock first for any reads or writes based on this value
inError bool
errorState string

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector

wg sync.WaitGroup
}

func NewTransactionSyncer(
Expand Down Expand Up @@ -152,7 +154,7 @@ func NewTransactionSyncer(
syncCollector: syncerMetrics,
customName: customName,
enableEndpointSlices: enableEndpointSlices,
inError: false,
errorState: "",
logger: logger,
}
// Syncer implements life cycle logic
Expand Down Expand Up @@ -267,8 +269,11 @@ func (s *transactionSyncer) syncInternalImpl() error {
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) {
s.setErrorState()
if valid, reason := s.isValidEPField(err); !valid {
s.setErrorState(reason)
}
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(reason)
}
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
Expand Down Expand Up @@ -327,17 +332,17 @@ func (s *transactionSyncer) syncInternalImpl() error {

// syncLock must already be acquired before execution
func (s *transactionSyncer) inErrorState() bool {
return s.inError
return s.errorState == ""
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) setErrorState() {
s.inError = true
func (s *transactionSyncer) setErrorState(errorState string) {
s.errorState = errorState
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) resetErrorState() {
s.inError = false
s.errorState = ""
}

// ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones.
Expand Down Expand Up @@ -382,18 +387,18 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
}

// isValidEndpointInfo checks if endpoint information is correct.
// It returns false if one of the two checks fails:
// It returns false and the corresponding reason if one of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool {
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, string) {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false
return false, negtypes.ResultEPCalculationCountZero
}

// Endpoint count from EndpointData
Expand All @@ -403,42 +408,47 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false
return false, negtypes.ResultEPSEndpointCountZero
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false
return false, negtypes.ResultEPCountsDiffer
}
return true
return true, negtypes.ResultSuccess
}

// isValidEPField returns false if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) bool {
// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) (bool, string) {
if errors.Is(err, ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false
return false, negtypes.ResultEPMissingNodeName
}
if errors.Is(err, ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false
return false, negtypes.ResultEPMissingZone
}
return true
return true, negtypes.ResultSuccess
}

// isValidEPBatch returns false if the error from endpoint batch response is due to bad request
func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool {
// isValidEPBatch returns false and the corresponding reason if the error from endpoint batch response is due to bad request
func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) (bool, string) {
apiErr, ok := err.(*googleapi.Error)
if !ok {
s.logger.Info("Detected error when parsing batch request error", "operation", operation, "error", err)
return false
s.logger.Info("Detected error when parsing batch response error", "operation", operation, "error", err)
return false, negtypes.ResultInvalidAPIResponse
}
errCode := apiErr.Code
if errCode == http.StatusBadRequest {
s.logger.Info("Detected error when sending endpoint batch information", "operation", operation, "errorCode", errCode)
return false
if operation == attachOp {
return false, negtypes.ResultInvalidEPAttach
}
if operation == detachOp {
return false, negtypes.ResultInvalidEPDetach
}
}
return true
return true, negtypes.ResultSuccess
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
Expand Down Expand Up @@ -482,25 +492,34 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
}
go s.collectSyncResult()
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult() {
s.wg.Wait()
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap)
}

// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap)
}

// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
defer s.wg.Done()
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
Expand All @@ -519,8 +538,10 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri
s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone))
} else {
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err))
if !s.isValidEPBatch(err, operation, networkEndpoints) {
s.setErrorState()
if valid, reason := s.isValidEPBatch(err, operation, networkEndpoints); !valid {
s.syncLock.Lock()
defer s.syncLock.Unlock()
s.setErrorState(reason)
}
}

Expand Down
62 changes: 44 additions & 18 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package syncers

import (
context2 "context"
"errors"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -1474,6 +1475,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap map[negtypes.NetworkEndpoint]types.NamespacedName
dupCount int
expect bool
expectedReason string
}{
{
desc: "counts equal, endpointData has no duplicated endpoints",
Expand Down Expand Up @@ -1546,6 +1548,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "counts equal, endpointData has duplicated endpoints",
Expand Down Expand Up @@ -1627,6 +1630,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 1,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "counts not equal, endpointData has no duplicated endpoints",
Expand Down Expand Up @@ -1690,6 +1694,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPCountsDiffer,
},
{
desc: "counts not equal, endpointData has duplicated endpoints",
Expand Down Expand Up @@ -1762,6 +1767,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 1,
expect: false,
expectedReason: negtypes.ResultEPCountsDiffer,
},
{
desc: "endpointData has zero endpoint",
Expand Down Expand Up @@ -1796,6 +1802,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPSEndpointCountZero,
},
{
desc: "endpointPodMap has zero endpoint",
Expand Down Expand Up @@ -1868,6 +1875,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{},
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPCalculationCountZero,
},
{
desc: "endpointData and endpointPodMap both have zero endpoint",
Expand Down Expand Up @@ -1902,6 +1910,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{},
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPCalculationCountZero,
},
{
desc: "endpointData and endpointPodMap both have non-zero endpoints",
Expand Down Expand Up @@ -1974,12 +1983,13 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
if got := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect {
if got, reason := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect && reason != tc.expectedReason {
t.Errorf("invalidEndpointInfo() = %t, expected %t", got, tc.expect)
}
})
Expand Down Expand Up @@ -2013,9 +2023,10 @@ func TestIsValidEPField(t *testing.T) {
}

testCases := []struct {
desc string
endpointsData []negtypes.EndpointsData
expect bool
desc string
endpointsData []negtypes.EndpointsData
expect bool
expectedReason string
}{
{
desc: "no missing zone or nodeName",
Expand Down Expand Up @@ -2085,7 +2096,8 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: true,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "contains one missing nodeName",
Expand Down Expand Up @@ -2155,7 +2167,8 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: false,
expectedReason: negtypes.ResultEPMissingNodeName,
},
{
desc: "contains one empty nodeName",
Expand Down Expand Up @@ -2225,7 +2238,8 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: false,
expectedReason: negtypes.ResultEPMissingNodeName,
},
{
desc: "contains one missing zone",
Expand Down Expand Up @@ -2295,13 +2309,14 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: false,
expectedReason: negtypes.ResultEPMissingZone,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, _, _, err := transactionSyncer.endpointsCalculator.CalculateEndpoints(tc.endpointsData, nil)
if got := transactionSyncer.isValidEPField(err); got != tc.expect {
if got, reason := transactionSyncer.isValidEPField(err); got != tc.expect && reason != tc.expectedReason {
t.Errorf("isValidEPField() = %t, expected %t, err: %v, ", got, tc.expect, err)
}
})
Expand All @@ -2316,33 +2331,44 @@ func TestIsValidEPBatch(t *testing.T) {

testCases := []struct {
desc string
HttpStatusCode int
APIResponse error
expect bool
expectedReason string
}{
{
desc: "NEG API call server error, status code 500",
HttpStatusCode: http.StatusOK,
desc: "NEG API call server error, status code 500",
APIResponse: &googleapi.Error{
Code: http.StatusOK,
},
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "NEG API call request error, status code 400",
APIResponse: &googleapi.Error{
Code: http.StatusBadRequest,
},
expect: false,
expectedReason: negtypes.ResultInvalidEPAttach,
},
{
desc: "NEG API call request error, status code 400",
HttpStatusCode: http.StatusBadRequest,
desc: "NEG API call invalid response",
APIResponse: errors.New("non googleapi error"),
expect: false,
expectedReason: negtypes.ResultInvalidAPIResponse,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
mockGCE := fakeGCE.Compute().(*cloud.MockGCE)
mockGCE.MockNetworkEndpointGroups.AttachNetworkEndpointsHook = func(ctx context2.Context, key *meta.Key, arg0 *compute.NetworkEndpointGroupsAttachEndpointsRequest, neg *cloud.MockNetworkEndpointGroups) error {
return &googleapi.Error{
Code: tc.HttpStatusCode,
}
return tc.APIResponse
}
_, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, true)

err := transactionSyncer.cloud.AttachNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, networkEndpoints, transactionSyncer.NegSyncerKey.GetAPIVersion())
if got := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect {
if got, reason := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect && reason != tc.expectedReason {
t.Errorf("isInvalidEPBatch() = %t, expected %t", got, tc.expect)
}
})
Expand Down
Loading

0 comments on commit a288318

Please sign in to comment.