Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor syncNetworkEndpoints #1931

Merged
merged 1 commit into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 49 additions & 28 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,18 @@ type transactionSyncer struct {

logger klog.Logger

// inError indicates if the syncer is in any of 4 error scenarios
// errorState indicates if the syncer is in any of 4 error scenarios
// 1. Endpoint counts from EPS is different from calculated endpoint list
// 2. EndpontSlice has missing or invalid data
// 3. Attach/Detach EP fails due to incorrect batch information
// 4. Endpoint count from EPS or calculated endpoint list is 0
// Need to grab syncLock first for any reads or writes based on this value
inError bool
errorState string

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector

wg sync.WaitGroup
}

func NewTransactionSyncer(
Expand Down Expand Up @@ -152,7 +154,7 @@ func NewTransactionSyncer(
syncCollector: syncerMetrics,
customName: customName,
enableEndpointSlices: enableEndpointSlices,
inError: false,
errorState: "",
logger: logger,
}
// Syncer implements life cycle logic
Expand Down Expand Up @@ -267,8 +269,11 @@ func (s *transactionSyncer) syncInternalImpl() error {
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if !s.isValidEPField(err) || !s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount) {
s.setErrorState()
if valid, reason := s.isValidEPField(err); !valid {
s.setErrorState(reason)
}
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
s.setErrorState(reason)
}
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
Expand Down Expand Up @@ -327,17 +332,17 @@ func (s *transactionSyncer) syncInternalImpl() error {

// syncLock must already be acquired before execution
func (s *transactionSyncer) inErrorState() bool {
return s.inError
return s.errorState != ""
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) setErrorState() {
s.inError = true
func (s *transactionSyncer) setErrorState(errorState string) {
s.errorState = errorState
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) resetErrorState() {
s.inError = false
s.errorState = ""
}

// ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones.
Expand Down Expand Up @@ -382,18 +387,18 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
}

// isValidEndpointInfo checks if endpoint information is correct.
// It returns false if one of the two checks fails:
// It returns false and the corresponding reason if one of the two checks fails:
//
// 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap:
// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed
// and we compare the endpoint counts with duplicates
// 2. The endpoint count from endpointData or the one from endpointPodMap is 0
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) bool {
func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap, dupCount int) (bool, string) {
// Endpoint count from EndpointPodMap
countFromPodMap := len(endpointPodMap) + dupCount
if countFromPodMap == 0 {
s.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap)
return false
return false, negtypes.ResultEPCalculationCountZero
}

// Endpoint count from EndpointData
Expand All @@ -403,42 +408,47 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en
}
if countFromEndpointData == 0 {
s.logger.Info("Detected endpoint count from endpointData going to zero", "endpointData", eds)
return false
return false, negtypes.ResultEPSEndpointCountZero
}

if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return false
return false, negtypes.ResultEPCountsDiffer
}
return true
return true, negtypes.ResultSuccess
}

// isValidEPField returns false if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) bool {
// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) (bool, string) {
if errors.Is(err, ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false
return false, negtypes.ResultEPMissingNodeName
}
if errors.Is(err, ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false
return false, negtypes.ResultEPMissingZone
}
return true
return true, negtypes.ResultSuccess
}

// isValidEPBatch returns false if the error from endpoint batch response is due to bad request
func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool {
// isValidEPBatch returns false and the corresponding reason if the error from endpoint batch response is due to bad request
func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) (bool, string) {
apiErr, ok := err.(*googleapi.Error)
if !ok {
s.logger.Info("Detected error when parsing batch request error", "operation", operation, "error", err)
return false
s.logger.Info("Detected error when parsing batch response error", "operation", operation, "error", err)
return false, negtypes.ResultInvalidAPIResponse
}
errCode := apiErr.Code
if errCode == http.StatusBadRequest {
s.logger.Info("Detected error when sending endpoint batch information", "operation", operation, "errorCode", errCode)
return false
if operation == attachOp {
return false, negtypes.ResultInvalidEPAttach
}
if operation == detachOp {
return false, negtypes.ResultInvalidEPDetach
}
}
return true
return true, negtypes.ResultSuccess
}

// syncNetworkEndpoints spins off go routines to execute NEG operations
Expand Down Expand Up @@ -482,25 +492,34 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
}
go s.collectSyncResult()
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult() {
s.wg.Wait()
}

Comment on lines +500 to +503
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this function, what is the proposed way to collect the results? How will you get the aggregate result of all the goroutines?

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap)
}

// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap)
}

// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
defer s.wg.Done()
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
Expand All @@ -519,8 +538,10 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri
s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone))
} else {
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err))
if !s.isValidEPBatch(err, operation, networkEndpoints) {
s.setErrorState()
if valid, reason := s.isValidEPBatch(err, operation, networkEndpoints); !valid {
s.syncLock.Lock()
defer s.syncLock.Unlock()
s.setErrorState(reason)
}
}

Expand Down
62 changes: 44 additions & 18 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package syncers

import (
context2 "context"
"errors"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -1474,6 +1475,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap map[negtypes.NetworkEndpoint]types.NamespacedName
dupCount int
expect bool
expectedReason string
}{
{
desc: "counts equal, endpointData has no duplicated endpoints",
Expand Down Expand Up @@ -1546,6 +1548,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "counts equal, endpointData has duplicated endpoints",
Expand Down Expand Up @@ -1627,6 +1630,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 1,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "counts not equal, endpointData has no duplicated endpoints",
Expand Down Expand Up @@ -1690,6 +1694,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPCountsDiffer,
},
{
desc: "counts not equal, endpointData has duplicated endpoints",
Expand Down Expand Up @@ -1762,6 +1767,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 1,
expect: false,
expectedReason: negtypes.ResultEPCountsDiffer,
},
{
desc: "endpointData has zero endpoint",
Expand Down Expand Up @@ -1796,6 +1802,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPSEndpointCountZero,
},
{
desc: "endpointPodMap has zero endpoint",
Expand Down Expand Up @@ -1868,6 +1875,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{},
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPCalculationCountZero,
},
{
desc: "endpointData and endpointPodMap both have zero endpoint",
Expand Down Expand Up @@ -1902,6 +1910,7 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: map[negtypes.NetworkEndpoint]types.NamespacedName{},
dupCount: 0,
expect: false,
expectedReason: negtypes.ResultEPCalculationCountZero,
},
{
desc: "endpointData and endpointPodMap both have non-zero endpoints",
Expand Down Expand Up @@ -1974,12 +1983,13 @@ func TestIsValidEndpointInfo(t *testing.T) {
endpointPodMap: testEndpointPodMap,
dupCount: 0,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
if got := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect {
if got, reason := transactionSyncer.isValidEndpointInfo(tc.endpointsData, tc.endpointPodMap, tc.dupCount); got != tc.expect && reason != tc.expectedReason {
t.Errorf("invalidEndpointInfo() = %t, expected %t", got, tc.expect)
}
})
Expand Down Expand Up @@ -2013,9 +2023,10 @@ func TestIsValidEPField(t *testing.T) {
}

testCases := []struct {
desc string
endpointsData []negtypes.EndpointsData
expect bool
desc string
endpointsData []negtypes.EndpointsData
expect bool
expectedReason string
}{
{
desc: "no missing zone or nodeName",
Expand Down Expand Up @@ -2085,7 +2096,8 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: true,
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "contains one missing nodeName",
Expand Down Expand Up @@ -2155,7 +2167,8 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: false,
expectedReason: negtypes.ResultEPMissingNodeName,
},
{
desc: "contains one empty nodeName",
Expand Down Expand Up @@ -2225,7 +2238,8 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: false,
expectedReason: negtypes.ResultEPMissingNodeName,
},
{
desc: "contains one missing zone",
Expand Down Expand Up @@ -2295,13 +2309,14 @@ func TestIsValidEPField(t *testing.T) {
},
},
},
expect: false,
expect: false,
expectedReason: negtypes.ResultEPMissingZone,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
_, _, _, err := transactionSyncer.endpointsCalculator.CalculateEndpoints(tc.endpointsData, nil)
if got := transactionSyncer.isValidEPField(err); got != tc.expect {
if got, reason := transactionSyncer.isValidEPField(err); got != tc.expect && reason != tc.expectedReason {
t.Errorf("isValidEPField() = %t, expected %t, err: %v, ", got, tc.expect, err)
}
})
Expand All @@ -2316,33 +2331,44 @@ func TestIsValidEPBatch(t *testing.T) {

testCases := []struct {
desc string
HttpStatusCode int
APIResponse error
expect bool
expectedReason string
}{
{
desc: "NEG API call server error, status code 500",
HttpStatusCode: http.StatusOK,
desc: "NEG API call server error, status code 500",
APIResponse: &googleapi.Error{
Code: http.StatusOK,
},
expect: true,
expectedReason: negtypes.ResultSuccess,
},
{
desc: "NEG API call request error, status code 400",
APIResponse: &googleapi.Error{
Code: http.StatusBadRequest,
},
expect: false,
expectedReason: negtypes.ResultInvalidEPAttach,
},
{
desc: "NEG API call request error, status code 400",
HttpStatusCode: http.StatusBadRequest,
desc: "NEG API call invalid response",
APIResponse: errors.New("non googleapi error"),
expect: false,
expectedReason: negtypes.ResultInvalidAPIResponse,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
mockGCE := fakeGCE.Compute().(*cloud.MockGCE)
mockGCE.MockNetworkEndpointGroups.AttachNetworkEndpointsHook = func(ctx context2.Context, key *meta.Key, arg0 *compute.NetworkEndpointGroupsAttachEndpointsRequest, neg *cloud.MockNetworkEndpointGroups) error {
return &googleapi.Error{
Code: tc.HttpStatusCode,
}
return tc.APIResponse
}
_, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, true)

err := transactionSyncer.cloud.AttachNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, networkEndpoints, transactionSyncer.NegSyncerKey.GetAPIVersion())
if got := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect {
if got, reason := transactionSyncer.isValidEPBatch(err, attachOp, networkEndpoints); got != tc.expect && reason != tc.expectedReason {
t.Errorf("isInvalidEPBatch() = %t, expected %t", got, tc.expect)
}
})
Expand Down
Loading