Skip to content

Commit

Permalink
Add metrics for endpoint state
Browse files Browse the repository at this point in the history
Added metrics to collect the state of each endpoint.
  • Loading branch information
sawsa307 committed Mar 1, 2023
1 parent 31b8430 commit 767e13b
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 37 deletions.
51 changes: 50 additions & 1 deletion pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/wait"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
Expand All @@ -45,6 +46,21 @@ type SyncerMetrics struct {
logger klog.Logger
}

var (
epStateLabel = "endpoint_state"
syncEndpointStateKey = "neg_sync_endpoint_state"

// syncerEndpointState tracks the count of endpoints in different states
syncerEndpointState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: negControllerSubsystem,
Name: syncEndpointStateKey,
Help: "Current count of endpoints in each state",
},
[]string{epStateLabel},
)
)

// NewNEGMetricsCollector initializes SyncerMetrics and starts a go routine to compute and export metrics periodically.
func NewNegMetricsCollector(exportInterval time.Duration, logger klog.Logger) *SyncerMetrics {
return &SyncerMetrics{
Expand All @@ -63,6 +79,7 @@ func FakeSyncerMetrics() *SyncerMetrics {

// RegisterSyncerMetrics registers syncer related metrics
func RegisterSyncerMetrics() {
prometheus.MustRegister(syncerEndpointState)
}

func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {
Expand All @@ -77,6 +94,11 @@ func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {

// export exports syncer metrics.
func (sm *SyncerMetrics) export() {
epStateCount := sm.computeSyncerEndpointStateMetrics()
sm.logger.V(3).Info("Exporting endpoint state metrics.")
for state, count := range epStateCount {
syncerEndpointState.WithLabelValues(string(state)).Set(float64(count))
}
}

// UpdateSyncer update the status of corresponding syncer based on the syncResult.
Expand All @@ -96,7 +118,7 @@ func (sm *SyncerMetrics) SetSyncerEPMetrics(key negtypes.NegSyncerKey, endpointS
defer sm.mu.Unlock()
if sm.syncerEndpointStateMap == nil {
sm.syncerEndpointStateMap = make(map[negtypes.NegSyncerKey]negtypes.StateCountMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEPStateMap: %v", sm.syncerEndpointStateMap)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerEndpointStateMap: %v", sm.syncerEndpointStateMap)
}
sm.syncerEndpointStateMap[key] = endpointStat.EndpointStateCount

Expand All @@ -106,3 +128,30 @@ func (sm *SyncerMetrics) SetSyncerEPMetrics(key negtypes.NegSyncerKey, endpointS
}
sm.syncerEPSStateMap[key] = endpointStat.EndpointSliceStateCount
}

func (sm *SyncerMetrics) computeSyncerEndpointStateMetrics() map[negtypes.State]int {
sm.mu.Lock()
defer sm.mu.Unlock()
counts := negtypes.StateCountMap{
negtypes.EPMissingNodeName: 0,
negtypes.EPMissingPod: 0,
negtypes.EPMissingZone: 0,
negtypes.EPMissingField: 0,
negtypes.EPDuplicate: 0,
negtypes.EPTotal: 0,
}
for key, syncerEPState := range sm.syncerEndpointStateMap {
sm.logger.V(6).Info("Computing syncer endpoint state metrics.", "Syncer key", key,
"EPMissingNodeName", syncerEPState[negtypes.EPMissingNodeName],
"EPMissingPod", syncerEPState[negtypes.EPMissingPod],
"EPMissingZone", syncerEPState[negtypes.EPMissingZone],
"EPMissingField", syncerEPState[negtypes.EPMissingField],
"EPDuplicate", syncerEPState[negtypes.EPDuplicate],
"EPTotal", syncerEPState[negtypes.EPTotal])
for state, count := range syncerEPState {
counts[state] += count
}
}
sm.logger.V(4).Info("Syncer endpoint state metrics computed.")
return counts
}
12 changes: 6 additions & 6 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (l *LocalL4ILBEndpointsCalculator) Mode() types.EndpointsCalculatorMode {
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, *types.SyncerEPStat, error) {
// List all nodes where the service endpoints are running. Get a subset of the desired count.
zoneNodeMap := make(map[string][]*v1.Node)
processedNodes := sets.String{}
Expand Down Expand Up @@ -101,12 +101,12 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
}
if numEndpoints == 0 {
// Not having backends will cause clients to see connection timeout instead of an "ICMP ConnectionRefused".
return nil, nil, 0, nil
return nil, nil, types.NewSyncerEPStat(), nil
}
// Compute the networkEndpoints, with total endpoints count <= l.subsetSizeLimit
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
return subsetMap, nil, 0, err
return subsetMap, nil, types.NewSyncerEPStat(), err
}

// ClusterL4ILBEndpointGetter implements the NetworkEndpointsCalculator interface.
Expand Down Expand Up @@ -142,7 +142,7 @@ func (l *ClusterL4ILBEndpointsCalculator) Mode() types.EndpointsCalculatorMode {
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, *types.SyncerEPStat, error) {
// In this mode, any of the cluster nodes can be part of the subset, whether or not a matching pod runs on it.
nodes, _ := utils.ListWithPredicate(l.nodeLister, utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes)

Expand All @@ -158,7 +158,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
klog.V(2).Infof("Got zoneNodeMap as input for service", "zoneNodeMap", nodeMapToString(zoneNodeMap), "serviceID", l.svcId)
// Compute the networkEndpoints, with total endpoints <= l.subsetSizeLimit.
subsetMap, err := getSubsetPerZone(zoneNodeMap, l.subsetSizeLimit, l.svcId, currentMap, l.logger)
return subsetMap, nil, 0, err
return subsetMap, nil, types.NewSyncerEPStat(), err
}

// L7EndpointsCalculator implements methods to calculate Network endpoints for VM_IP_PORT NEGs
Expand Down Expand Up @@ -186,7 +186,7 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, *types.SyncerEPStat, error) {
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (s *transactionSyncer) syncInternalImpl() error {

var targetMap map[string]negtypes.NetworkEndpointSet
var endpointPodMap negtypes.EndpointPodMap
var dupCount int
var syncerEPStat *negtypes.SyncerEPStat

slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
Expand Down Expand Up @@ -258,11 +258,11 @@ func (s *transactionSyncer) syncInternalImpl() error {

}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
targetMap, endpointPodMap, syncerEPStat, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if valid, reason := s.isValidEPField(err); !valid {
s.setErrorState(reason)
}
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, dupCount); !valid {
if valid, reason := s.isValidEndpointInfo(endpointsData, endpointPodMap, syncerEPStat.EndpointStateCount[negtypes.EPDuplicate]); !valid {
s.setErrorState(reason)
}
if err != nil {
Expand Down Expand Up @@ -388,11 +388,11 @@ func (s *transactionSyncer) isValidEndpointInfo(eds []negtypes.EndpointsData, en

// isValidEPField returns false and the corresponding reason if there is endpoint with missing zone or nodeName
func (s *transactionSyncer) isValidEPField(err error) (bool, string) {
if errors.Is(err, ErrEPMissingNodeName) {
if errors.Is(err, negtypes.ErrEPMissingNodeName) {
s.logger.Info("Detected unexpected error when checking missing nodeName", "error", err)
return false, negtypes.ResultEPMissingNodeName
}
if errors.Is(err, ErrEPMissingZone) {
if errors.Is(err, negtypes.ErrEPMissingZone) {
s.logger.Info("Detected unexpected error when checking missing zone", "error", err)
return false, negtypes.ResultEPMissingZone
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2295,7 +2295,7 @@ func TestIsValidEPField(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
_, _, _, err := transactionSyncer.endpointsCalculator.CalculateEndpoints(tc.endpointsData, nil)
if got, reason := transactionSyncer.isValidEPField(err); got != tc.expect && reason != tc.expectedReason {
t.Errorf("isValidEPField() = %t, expected %t, err: %v, ", got, tc.expect, err)
t.Errorf("isValidEPField() = %t, expected %t, err: %v", got, tc.expect, err)
}
})
}
Expand Down
65 changes: 42 additions & 23 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package syncers

import (
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -46,12 +45,6 @@ const (
separator = "||"
)

var (
ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field")
ErrNodeNotFound = errors.New("failed to retrieve associated zone of node")
ErrEPMissingZone = errors.New("endpoint has empty zone field")
)

// encodeEndpoint encodes ip and instance into a single string
func encodeEndpoint(ip, instance, port string) string {
return strings.Join([]string{ip, instance, port}, separator)
Expand Down Expand Up @@ -224,13 +217,14 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
}

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicate endpoints
func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, *negtypes.SyncerEPStat, error) {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
syncerEPStat := negtypes.NewSyncerEPStat()
var returnErr error
if eds == nil {
klog.Errorf("Endpoint object is nil")
return zoneNetworkEndpointMap, networkEndpointPodMap, dupCount, nil
return zoneNetworkEndpointMap, networkEndpointPodMap, syncerEPStat, nil
}
var foundMatchingPort bool
for _, ed := range eds {
Expand All @@ -251,20 +245,15 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.
foundMatchingPort = true

for _, endpointAddress := range ed.Addresses {
if endpointAddress.NodeName == nil || len(*endpointAddress.NodeName) == 0 {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return nil, nil, dupCount, ErrEPMissingNodeName
}
if endpointAddress.TargetRef == nil {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
zone, foundNode, skipping, err := checkMissingField(zoneGetter, endpointAddress, syncerEPStat.EndpointStateCount, fmt.Sprintf("%s/%s", ed.Meta.Namespace, ed.Meta.Name))
if !foundNode && returnErr == nil {
returnErr = negtypes.ErrNodeNotFound
}
zone, err := zoneGetter.GetZoneForNode(*endpointAddress.NodeName)
if err != nil {
return nil, nil, dupCount, ErrNodeNotFound
returnErr = err
}
if zone == "" {
return nil, nil, dupCount, ErrEPMissingZone
if skipping {
continue
}
if zoneNetworkEndpointMap[zone] == nil {
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
Expand All @@ -280,7 +269,7 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.

// increment the count for duplicate endpoint
if _, contains := networkEndpointPodMap[networkEndpoint]; contains {
dupCount += 1
syncerEPStat.EndpointStateCount[negtypes.EPDuplicate] += 1
}
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}
}
Expand All @@ -293,7 +282,37 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.
if len(zoneNetworkEndpointMap) == 0 || len(networkEndpointPodMap) == 0 {
klog.V(3).Infof("Generated empty endpoint maps (zoneNetworkEndpointMap: %+v, networkEndpointPodMap: %v) from Endpoints object: %+v", zoneNetworkEndpointMap, networkEndpointPodMap, eds)
}
return zoneNetworkEndpointMap, networkEndpointPodMap, dupCount, nil
return zoneNetworkEndpointMap, networkEndpointPodMap, syncerEPStat, returnErr
}

func checkMissingField(zoneGetter negtypes.ZoneGetter, endpointAddress negtypes.AddressData, epState negtypes.StateCountMap, endpointsName string) (string, bool, bool, error) {
var returnErr error
var err error
var skipping bool
var zone string
if endpointAddress.TargetRef == nil {
klog.V(2).Infof("Endpoint %q in Endpoints %s does not have an associated pod. Skipping", endpointAddress.Addresses, endpointsName)
epState[negtypes.EPMissingPod] += 1
skipping = true
}
if endpointAddress.NodeName == nil || len(*endpointAddress.NodeName) == 0 {
klog.V(2).Infof("Endpoint %q in Endpoints %s does not have an associated node. Skipping", endpointAddress.Addresses, endpointsName)
epState[negtypes.EPMissingNodeName] += 1
epState[negtypes.EPMissingZone] += 1
returnErr = negtypes.ErrEPMissingNodeName
skipping = true
} else {
zone, err = zoneGetter.GetZoneForNode(*endpointAddress.NodeName)
if zone == "" {
epState[negtypes.EPMissingZone] += 1
returnErr = negtypes.ErrEPMissingZone
skipping = true
}
}
if skipping {
epState[negtypes.EPMissingField] += 1
}
return zone, err == nil, skipping, returnErr
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type NegSyncerManager interface {
type NetworkEndpointsCalculator interface {
// CalculateEndpoints computes the NEG endpoints based on service endpoints and the current NEG state and returns a
// map of zone name to network endpoint set
CalculateEndpoints(eds []EndpointsData, currentMap map[string]NetworkEndpointSet) (map[string]NetworkEndpointSet, EndpointPodMap, int, error)
CalculateEndpoints(eds []EndpointsData, currentMap map[string]NetworkEndpointSet) (map[string]NetworkEndpointSet, EndpointPodMap, *SyncerEPStat, error)
// Mode indicates the mode that the EndpointsCalculator is operating in.
Mode() EndpointsCalculatorMode
}

0 comments on commit 767e13b

Please sign in to comment.