Skip to content

Commit

Permalink
Collect state counts for endpoint and endpoint slices
Browse files Browse the repository at this point in the history
Define SyncerEPStat to store the count of endpoint and endpoint slices
in different states. Define the states for endpoint and endpoint slices.
Collect state counts in neg metrics collector.
  • Loading branch information
sawsa307 committed Feb 17, 2023
1 parent 3dd1e48 commit 07f8b13
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 32 deletions.
30 changes: 27 additions & 3 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ import (

type SyncerMetricsCollector interface {
UpdateSyncer(key negtypes.NegSyncerKey, result *negtypes.NegSyncResult)
SetSyncerEPMetrics(key negtypes.NegSyncerKey, epState *negtypes.SyncerEPStat)
}

type SyncerMetrics struct {
// syncerStatusMap tracks the status of each syncer
syncerStatusMap map[negtypes.NegSyncerKey]string
// syncerEndpointStateMap is a map between syncer and endpoint state counts
syncerEndpointStateMap map[negtypes.NegSyncerKey]negtypes.StateCountMap
// syncerEPSStateMap is a map between syncer and endpoint slice state counts
syncerEPSStateMap map[negtypes.NegSyncerKey]negtypes.StateCountMap
// mu avoid race conditions and ensure correctness of metrics
mu sync.Mutex
// duration between metrics exports
Expand All @@ -43,9 +48,11 @@ type SyncerMetrics struct {
// NewNEGMetricsCollector initializes SyncerMetrics and starts a go routine to compute and export metrics periodically.
func NewNegMetricsCollector(exportInterval time.Duration, logger klog.Logger) *SyncerMetrics {
return &SyncerMetrics{
syncerStatusMap: make(map[negtypes.NegSyncerKey]string),
metricsInterval: exportInterval,
logger: logger.WithName("NegMetricsCollector"),
syncerStatusMap: make(map[negtypes.NegSyncerKey]string),
syncerEndpointStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerEPSStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
metricsInterval: exportInterval,
logger: logger.WithName("NegMetricsCollector"),
}
}

Expand Down Expand Up @@ -82,3 +89,20 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg
}
sm.syncerStatusMap[key] = syncResult.Result
}

// SetSyncerEPMetrics update the endpoint count based on the endpointStat
func (sm *SyncerMetrics) SetSyncerEPMetrics(key negtypes.NegSyncerKey, endpointStat *negtypes.SyncerEPStat) {
sm.mu.Lock()
defer sm.mu.Unlock()
if sm.syncerEndpointStateMap == nil {
sm.syncerEndpointStateMap = make(map[negtypes.NegSyncerKey]negtypes.StateCountMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPStateMap: %v", sm.syncerEndpointStateMap)
}
sm.syncerEndpointStateMap[key] = endpointStat.EndpointStateCount

if sm.syncerEPSStateMap == nil {
sm.syncerEPSStateMap = make(map[negtypes.NegSyncerKey]negtypes.StateCountMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPSStateMap: %v", sm.syncerEPSStateMap)
}
sm.syncerEPSStateMap[key] = endpointStat.EndpointSliceStateCount
}
30 changes: 15 additions & 15 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ type transactionSyncer struct {

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector

wg sync.WaitGroup
}

func NewTransactionSyncer(
Expand Down Expand Up @@ -453,6 +451,8 @@ func (s *transactionSyncer) isValidEPBatch(err error, operation transactionOp, n

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
wg := sync.WaitGroup{}

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
for zone, endpointSet := range endpointMap {
if endpointSet.Len() == 0 {
Expand All @@ -476,10 +476,10 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if operation == attachOp {
s.attachNetworkEndpoints(zone, batch)
s.attachNetworkEndpoints(zone, batch, &wg)
}
if operation == detachOp {
s.detachNetworkEndpoints(zone, batch)
s.detachNetworkEndpoints(zone, batch, &wg)
}
}
return nil
Expand All @@ -492,34 +492,34 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
}
go s.collectSyncResult()
go s.collectSyncResult(&wg)
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult() {
s.wg.Wait()
func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
wg.Wait()
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap)
wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap, wg)
}

// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
s.wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap)
wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap, wg)
}

// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
defer s.wg.Done()
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
defer wg.Done()
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
Expand Down
27 changes: 13 additions & 14 deletions pkg/neg/types/endpoint_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@ package types

type State string

func (s State) String() string {
return string(s)
}

const (
EPMissingNodeName = State("endpointMissingNodeName")
EPMissingPod = State("endpointMissingPod")
EPMissingZone = State("endpointMissingZone")
EPMissingField = State("endpointMissingField")
EPDuplicate = State("endpointDuplicate")
EPTotal = State("endpointTotal")
EPMissingNodeName = State("EndpointMissingNodeName")
EPMissingPod = State("EndpointMissingPod")
EPMissingZone = State("EndpointMissingZone")
EPMissingField = State("EndpointMissingField")
EPDuplicate = State("EndpointDuplicate")
EPTotal = State("EndpointTotal")

EPSWithMissingNodeName = State("EndpointsliceWithMissingNodeNameEP")
EPSWithMissingPod = State("EndpointsliceWithMissingPodEP")
EPSWithMissingZone = State("EndpointsliceWithMissingZoneEP")
EPSWithMissingField = State("EndpointsliceWithMissingFieldEP")
EPSWithDuplicate = State("EndpointsliceWithDuplicateEP")
EPSTotal = State("EndpointsliceTotal")
)

func StateForEP() []State {
return []State{EPMissingNodeName, EPMissingPod, EPMissingZone, EPMissingField, EPDuplicate, EPTotal}
}

// SyncerEPStat contains endpoint and endpointslice status related to a syncer
type SyncerEPStat struct {
EndpointStateCount StateCountMap
Expand Down

0 comments on commit 07f8b13

Please sign in to comment.