Skip to content

Commit

Permalink
Add label propagation calculation logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ruixiansong committed Apr 14, 2023
1 parent 213c56c commit 9b62af3
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 49 deletions.
1 change: 1 addition & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
wait.Until(c.gc, c.gcPeriod, stopCh)
}()
go c.reflector.Run(stopCh)
go c.syncerMetrics.Run(stopCh)
<-stopCh
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/neg/metrics/label_propagation_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
)

const (
labelNumber = "label_number_per_endpoint"
annotationSize = "annotation_size_per_endpoint"
labelErrorNumber = "label_propagation_error_count"
numberOfEndpoints = "number_of_endpoints"
labelNumber = "label_number_per_endpoint"
annotationSize = "annotation_size_per_endpoint"
labelErrorNumber = "label_propagation_error_count"
numberOfEndpoints = "number_of_endpoints"
epWithAnnotation = "with_annoatation"
epWithoutAnnotation = "without_annotation"
)

var (
Expand Down
4 changes: 4 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func RegisterMetrics() {
prometheus.MustRegister(InitializationLatency)
prometheus.MustRegister(SyncerStaleness)
prometheus.MustRegister(EPSStaleness)
prometheus.MustRegister(NumberOfEndpoints)
prometheus.MustRegister(LabelPropagationError)
prometheus.MustRegister(LabelNumber)
prometheus.MustRegister(AnnotationSize)

RegisterSyncerMetrics()
})
Expand Down
15 changes: 15 additions & 0 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type SyncerMetricsCollector interface {
UpdateSyncer(key negtypes.NegSyncerKey, result *negtypes.NegSyncResult)
SetSyncerEPMetrics(key negtypes.NegSyncerKey, epState *negtypes.SyncerEPStat)
SetLabelPropagationStats(key negtypes.NegSyncerKey, labelstatLabelPropagationStats LabelPropagationStats)
}

type SyncerMetrics struct {
Expand Down Expand Up @@ -79,6 +80,10 @@ func (sm *SyncerMetrics) Run(stopCh <-chan struct{}) {

// export exports syncer metrics.
func (sm *SyncerMetrics) export() {
lpMetrics := sm.computeLabelMetrics()
NumberOfEndpoints.WithLabelValues(epWithoutAnnotation).Set(float64(lpMetrics.NumberOfEndpoints - lpMetrics.EndpointsWithAnnotation))
NumberOfEndpoints.WithLabelValues(epWithAnnotation).Set(float64(lpMetrics.EndpointsWithAnnotation))
sm.logger.V(3).Info("Exporting syncer related metrics", "Number of Endpoints", lpMetrics.NumberOfEndpoints)
}

// UpdateSyncer update the status of corresponding syncer based on the syncResult.
Expand Down Expand Up @@ -109,6 +114,16 @@ func (sm *SyncerMetrics) SetSyncerEPMetrics(key negtypes.NegSyncerKey, endpointS
sm.syncerEPSStateMap[key] = endpointStat.EndpointSliceStateCount
}

func (sm *SyncerMetrics) SetLabelPropagationStats(key negtypes.NegSyncerKey, labelstatLabelPropagationStats LabelPropagationStats) {
sm.mu.Lock()
defer sm.mu.Unlock()
if sm.syncerLabelProagationStats == nil {
sm.syncerLabelProagationStats = make(map[negtypes.NegSyncerKey]LabelPropagationStats)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerLabelProagationStats: %v", sm.syncerLabelProagationStats)
}
sm.syncerLabelProagationStats[key] = labelstatLabelPropagationStats
}

// computeLabelMetrics aggregates label propagation metrics.
func (sm *SyncerMetrics) computeLabelMetrics() LabelPropagationMetrics {
sm.mu.Lock()
Expand Down
23 changes: 23 additions & 0 deletions pkg/neg/syncers/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
)
Expand All @@ -44,6 +45,12 @@ type PodLabelMap map[string]string
// EndpointPodLabelMap is a map of network endpoint, endpoint annotations.
type EndpointPodLabelMap map[negtypes.NetworkEndpoint]PodLabelMap

const (
Truncated = "truncated"
TruncationFailure = "truncation_failed"
OtherError = "other_error"
)

var (
ErrLabelTruncated = errors.New("label is truncated")
ErrLabelTruncationFailed = errors.New("failed to truncate label")
Expand All @@ -70,6 +77,12 @@ func GetPodLabelMap(pod *v1.Pod, lpConfig PodLabelPropagationConfig) (PodLabelMa
errs = append(errs, err)
}

if errors.Is(err, ErrLabelTruncated) {
metrics.PublishLabelPropagationError(Truncated)
} else if errors.Is(err, ErrLabelTruncationFailed) {
metrics.PublishLabelPropagationError(TruncationFailure)
}

// Add the label to the map only if the truncation result is valid
if err == nil || errors.Is(err, ErrLabelTruncated) {
labelMap[lpKey] = labelVal
Expand Down Expand Up @@ -100,3 +113,13 @@ func truncatePodLabel(key, label string, maxTotalSize int) (string, error) {
truncatedVal := string(labelBytes[:maxTotalSize-len(keyBytes)])
return truncatedVal, fmt.Errorf("%w: `%s:%s` is truncated to `%s:%s` because the total length exceeded the limit, length: %d, limit: %d", ErrLabelTruncated, key, label, key, truncatedVal, len(key)+len(label), maxTotalSize)
}

// PodLabelMapSize calculates the size of a podLabelMap.
func GetPodLabelMapSize(podLabelMap PodLabelMap) int {
var res int
for key, val := range podLabelMap {
res += len([]byte(key))
res += len([]byte(val))
}
return res
}
32 changes: 31 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *transactionSyncer) syncInternalImpl() error {
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
currentMap, currentPodLabelMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return err
}
Expand Down Expand Up @@ -312,8 +312,11 @@ func (s *transactionSyncer) syncInternalImpl() error {
// Only fetch label from pod for L7 endpoints
if flags.F.EnableNEGLabelPropagation && s.NegType == negtypes.VmIpPortEndpointType {
endpointPodLabelMap = getEndpointPodLabelMap(addEndpoints, endpointPodMap, s.podLister, s.podLabelPropagationConfig, s.recorder, s.logger)
publishAnnotationSizeMetrics(addEndpoints, endpointPodLabelMap)
}

s.syncCollector.SetLabelPropagationStats(s.NegSyncerKey, collectLabelStats(currentPodLabelMap, endpointPodLabelMap, targetMap))

if s.needCommit() {
s.commitPods(committedEndpoints, endpointPodMap)
}
Expand Down Expand Up @@ -866,11 +869,13 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
key := fmt.Sprintf("%s/%s", endpointPodMap[endpoint].Namespace, endpointPodMap[endpoint].Name)
obj, ok, err := podLister.GetByKey(key)
if err != nil || !ok {
metrics.PublishLabelPropagationError(labels.OtherError)
logger.Error(err, "getEndpointPodLabelMap: error getting pod", "pod", key, "exist", ok)
continue
}
pod, ok := obj.(*v1.Pod)
if !ok {
metrics.PublishLabelPropagationError(labels.OtherError)
logger.Error(nil, "expected type *v1.Pod", "pod", key, "type", fmt.Sprintf("%T", obj))
continue
}
Expand All @@ -883,3 +888,28 @@ func getEndpointPodLabelMap(endpoints map[string]negtypes.NetworkEndpointSet, en
}
return endpointPodLabelMap
}

// publishAnnotationSizeMetrics goes through all the endpoints to be attached
// and publish annotation size metrics.
func publishAnnotationSizeMetrics(endpoints map[string]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap) {
for _, endpointSet := range endpoints {
for endpoint := range endpointSet {
labelMap := endpointPodLabelMap[endpoint]
metrics.PublishAnnotationMetrics(labels.GetPodLabelMapSize(labelMap), len(labelMap))
}
}
}

// collectLabelStats calculate the number of endpoints and the number of endpoints with annotations.
func collectLabelStats(currentPodLabelMap, addPodLabelMap labels.EndpointPodLabelMap, targetEndpointMap map[string]negtypes.NetworkEndpointSet) metrics.LabelPropagationStats {
labelPropagationStats := metrics.LabelPropagationStats{}
for _, endpointSet := range targetEndpointMap {
for endpoint := range endpointSet {
labelPropagationStats.NumberOfEndpoints += 1
if currentPodLabelMap[endpoint] != nil || addPodLabelMap[endpoint] != nil {
labelPropagationStats.EndpointsWithAnnotation += 1
}
}
}
return labelPropagationStats
}
109 changes: 106 additions & 3 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ func TestUnknownNodes(t *testing.T) {
}

// Check that unknown zone did not cause endpoints to be removed
out, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
out, _, err := retrieveExistingZoneNetworkEndpointMap(testNegName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
if err != nil {
t.Errorf("errored retrieving existing network endpoints")
}
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func TestEnableDegradedMode(t *testing.T) {
(s.syncer.(*syncer)).stopped = false
tc.modify(s)

out, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
out, _, err := retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
if err != nil {
t.Errorf("errored retrieving existing network endpoints")
}
Expand All @@ -1777,7 +1777,7 @@ func TestEnableDegradedMode(t *testing.T) {
t.Errorf("after syncInternal, error state is %v, expected to be %v", s.inErrorState(), tc.expectedInErrorState)
}
err = wait.PollImmediate(time.Second, 3*time.Second, func() (bool, error) {
out, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
out, _, err = retrieveExistingZoneNetworkEndpointMap(tc.negName, zoneGetter, fakeCloud, meta.VersionGA, negtypes.L7Mode)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1910,6 +1910,109 @@ func TestGetEndpointPodLabelMap(t *testing.T) {
}
}

func TestCollectLabelStats(t *testing.T) {
t.Parallel()

testIP1 := "1.2.3.4"
testIP2 := "1.2.3.5"
testIP3 := "1.2.3.6"
testIP4 := "1.2.3.7"
testPort := int64(80)
endpoint1 := negtypes.NetworkEndpoint{IP: testIP1, Node: negtypes.TestInstance1, Port: strconv.Itoa(int(testPort))}
endpoint2 := negtypes.NetworkEndpoint{IP: testIP2, Node: negtypes.TestInstance2, Port: strconv.Itoa(int(testPort))}
endpoint3 := negtypes.NetworkEndpoint{IP: testIP3, Node: negtypes.TestInstance3, Port: strconv.Itoa(int(testPort))}
endpoint4 := negtypes.NetworkEndpoint{IP: testIP4, Node: negtypes.TestInstance4, Port: strconv.Itoa(int(testPort))}

for _, tc := range []struct {
desc string
curLabelMap labels.EndpointPodLabelMap
addLabelMap labels.EndpointPodLabelMap
targetEndpointMap map[string]negtypes.NetworkEndpointSet
expect metrics.LabelPropagationStats
}{
{
desc: "Empty inputs",
curLabelMap: labels.EndpointPodLabelMap{},
addLabelMap: labels.EndpointPodLabelMap{},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 0,
NumberOfEndpoints: 0,
},
},
{
desc: "No new endpoints to be added",
curLabelMap: labels.EndpointPodLabelMap{
endpoint1: labels.PodLabelMap{
"foo": "bar",
},
},
addLabelMap: labels.EndpointPodLabelMap{},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
testZone1: negtypes.NewNetworkEndpointSet(
endpoint1,
endpoint2,
),
},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 1,
NumberOfEndpoints: 2,
},
},
{
desc: "Some endpoints to be added",
curLabelMap: labels.EndpointPodLabelMap{
endpoint1: labels.PodLabelMap{
"foo": "bar",
},
},
addLabelMap: labels.EndpointPodLabelMap{
endpoint3: labels.PodLabelMap{
"foo": "bar",
},
},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
testZone1: negtypes.NewNetworkEndpointSet(
endpoint1,
endpoint2,
),
testZone2: negtypes.NewNetworkEndpointSet(
endpoint3,
endpoint4,
),
},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 2,
NumberOfEndpoints: 4,
},
},
{
desc: "Only newly added endpoints",
curLabelMap: labels.EndpointPodLabelMap{},
addLabelMap: labels.EndpointPodLabelMap{
endpoint3: labels.PodLabelMap{
"foo": "bar",
},
},
targetEndpointMap: map[string]negtypes.NetworkEndpointSet{
testZone2: negtypes.NewNetworkEndpointSet(
endpoint3,
endpoint4,
),
},
expect: metrics.LabelPropagationStats{
EndpointsWithAnnotation: 1,
NumberOfEndpoints: 2,
},
},
} {
out := collectLabelStats(tc.curLabelMap, tc.addLabelMap, tc.targetEndpointMap)
if diff := cmp.Diff(out, tc.expect); diff != "" {
t.Errorf("For test case %s: got %+v, want %+v, diff %s", tc.desc, out, tc.expect, diff)
}
}
}

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG)
Expand Down
12 changes: 7 additions & 5 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,21 +560,22 @@ func ipsForPod(eds []negtypes.EndpointsData) map[types.NamespacedName]negtypes.N
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, error) {
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) {
// Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates.
// Endpoints in those NEGs now need to be removed. This mostly applies to VM_IP_NEGs where the endpoints are nodes.
zones, err := zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
return nil, err
return nil, nil, err
}

candidateNodeZones, err := zoneGetter.ListZones(negtypes.NodePredicateForEndpointCalculatorMode(mode))
if err != nil {
return nil, err
return nil, nil, err
}
candidateZonesMap := sets.NewString(candidateNodeZones...)

zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
endpointPodLabelMap := labels.EndpointPodLabelMap{}
for _, zone := range zones {
networkEndpointsWithHealthStatus, err := cloud.ListNetworkEndpoints(negName, zone, false, version)
if err != nil {
Expand All @@ -584,7 +585,7 @@ func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.
klog.Infof("Ignoring NotFound error for NEG %q in zone %q", negName, zone)
continue
}
return nil, fmt.Errorf("Failed to lookup NEG in zone %q, candidate zones %v, err - %v", zone, candidateZonesMap, err)
return nil, nil, fmt.Errorf("Failed to lookup NEG in zone %q, candidate zones %v, err - %v", zone, candidateZonesMap, err)
}
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
for _, ne := range networkEndpointsWithHealthStatus {
Expand All @@ -593,9 +594,10 @@ func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.
newNE.Port = strconv.FormatInt(ne.NetworkEndpoint.Port, 10)
}
zoneNetworkEndpointMap[zone].Insert(newNE)
endpointPodLabelMap[newNE] = ne.NetworkEndpoint.Annotations
}
}
return zoneNetworkEndpointMap, nil
return zoneNetworkEndpointMap, endpointPodLabelMap, nil
}

// makeEndpointBatch return a batch of endpoint from the input and remove the endpoints from input set
Expand Down
Loading

0 comments on commit 9b62af3

Please sign in to comment.