Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new metrics for NEG DualStack Migrator #2095

Merged
merged 2 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions pkg/neg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ const (

NotInDegradedEndpoints = "not_in_degraded_endpoints"
OnlyInDegradedEndpoints = "only_in_degraded_endpoints"

// Classification of endpoints within a NEG.
ipv4EndpointType = "IPv4"
ipv6EndpointType = "IPv6"
dualStackEndpointType = "DualStack"
migrationEndpointType = "Migration"
)

type syncType string
Expand Down Expand Up @@ -169,6 +175,42 @@ var (
},
degradedModeCorrectnessLabels,
)

DualStackMigrationFinishedDurations = prometheus.NewHistogram(
prometheus.HistogramOpts{
Subsystem: negControllerSubsystem,
Name: "dual_stack_migration_finished_durations_seconds",
Help: "Time taken to migrate all endpoints within all NEGs for a service port",
// Buckets ~= [1s, 1.85s, 3.42s, 6s, 11s, 21s, 40s, 1m14s, 2m17s, 4m13s, 7m49s, 14m28s, 26m47s, 49m33s, 1h31m40s, 2h49m35s, 5h13m45s, 9h40m27s, +Inf]
Buckets: prometheus.ExponentialBuckets(1, 1.85, 18),
},
)

// A zero value for this metric means that there are no ongoing migrations.
DualStackMigrationLongestUnfinishedDuration = prometheus.NewGauge(
prometheus.GaugeOpts{
Subsystem: negControllerSubsystem,
Name: "dual_stack_migration_longest_unfinished_duration_seconds",
Help: "Longest time elapsed since a migration was started which hasn't yet completed",
},
)

DualStackMigrationServiceCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Subsystem: negControllerSubsystem,
Name: "dual_stack_migration_service_count",
Help: "Number of Services which have migration endpoints",
},
)

SyncerCountByEndpointType = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: negControllerSubsystem,
Name: "syncer_count_by_endpoint_type",
Help: "Number of Syncers managing NEGs containing endpoint of a particular kind",
},
[]string{"endpoint_type"},
)
)

var register sync.Once
Expand All @@ -188,6 +230,10 @@ func RegisterMetrics() {
prometheus.MustRegister(LabelNumber)
prometheus.MustRegister(AnnotationSize)
prometheus.MustRegister(DegradeModeCorrectness)
prometheus.MustRegister(DualStackMigrationFinishedDurations)
prometheus.MustRegister(DualStackMigrationLongestUnfinishedDuration)
prometheus.MustRegister(DualStackMigrationServiceCount)
prometheus.MustRegister(SyncerCountByEndpointType)

RegisterSyncerMetrics()
})
Expand Down
169 changes: 165 additions & 4 deletions pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ limitations under the License.
package metrics

import (
"fmt"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

type SyncerMetricsCollector interface {
Expand All @@ -35,6 +38,11 @@ type SyncerMetricsCollector interface {
}

type SyncerMetrics struct {
clock clock.Clock
// duration between metrics exports
metricsInterval time.Duration

mu sync.Mutex
// syncerStateMap tracks the status of each syncer
syncerStateMap map[negtypes.NegSyncerKey]negtypes.Reason
// syncerEndpointStateMap is a map between syncer and endpoint state counts.
Expand All @@ -43,10 +51,14 @@ type SyncerMetrics struct {
syncerEndpointSliceStateMap map[negtypes.NegSyncerKey]negtypes.StateCountMap
// syncerLabelProagationStats is a map between syncer and label propagation stats.
syncerLabelProagationStats map[negtypes.NegSyncerKey]LabelPropagationStats
// mu avoid race conditions and ensure correctness of metrics
mu sync.Mutex
// duration between metrics exports
metricsInterval time.Duration
// Stores the time when the migration started for each Syncer.
dualStackMigrationStartTime map[negtypes.NegSyncerKey]time.Time
// Stores the time when the migration finished for each Syncer.
dualStackMigrationEndTime map[negtypes.NegSyncerKey]time.Time
// Stores the count of various kinds of endpoints which each syncer manages.
// Refer neg/metrics.go for the kinds of endpoints.
endpointsCountPerType map[negtypes.NegSyncerKey]map[string]int

// logger logs message related to NegMetricsCollector
logger klog.Logger
}
Expand All @@ -58,6 +70,10 @@ func NewNegMetricsCollector(exportInterval time.Duration, logger klog.Logger) *S
syncerEndpointStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerEndpointSliceStateMap: make(map[negtypes.NegSyncerKey]negtypes.StateCountMap),
syncerLabelProagationStats: make(map[negtypes.NegSyncerKey]LabelPropagationStats),
dualStackMigrationStartTime: make(map[negtypes.NegSyncerKey]time.Time),
dualStackMigrationEndTime: make(map[negtypes.NegSyncerKey]time.Time),
endpointsCountPerType: make(map[negtypes.NegSyncerKey]map[string]int),
clock: clock.RealClock{},
metricsInterval: exportInterval,
logger: logger.WithName("NegMetricsCollector"),
}
Expand Down Expand Up @@ -104,6 +120,21 @@ func (sm *SyncerMetrics) export() {
}

sm.logger.V(3).Info("Exporting syncer related metrics", "Syncer count", syncerCount, "Number of Endpoints", lpMetrics.NumberOfEndpoints)

finishedDurations, longestUnfinishedDurations := sm.computeDualStackMigrationDurations()
for _, duration := range finishedDurations {
DualStackMigrationFinishedDurations.Observe(float64(duration))
swetharepakula marked this conversation as resolved.
Show resolved Hide resolved
}
DualStackMigrationLongestUnfinishedDuration.Set(float64(longestUnfinishedDurations))

syncerCountByEndpointType, migrationEndpointCount, migrationServicesCount := sm.computeDualStackMigrationCounts()
for endpointType, count := range syncerCountByEndpointType {
SyncerCountByEndpointType.WithLabelValues(endpointType).Set(float64(count))
}
syncerEndpointState.WithLabelValues(string(negtypes.DualStackMigration)).Set(float64(migrationEndpointCount))
DualStackMigrationServiceCount.Set(float64(migrationServicesCount))

sm.logger.V(3).Info("Exported DualStack Migration metrics")
}

// UpdateSyncerStatusInMetrics update the status of syncer based on the error
Expand Down Expand Up @@ -150,13 +181,18 @@ func (sm *SyncerMetrics) SetLabelPropagationStats(key negtypes.NegSyncerKey, lab
sm.syncerLabelProagationStats[key] = labelstatLabelPropagationStats
}

// DeleteSyncer will reset any metrics for the syncer corresponding to `key`. It
// should be invoked when a Syncer has been stopped.
func (sm *SyncerMetrics) DeleteSyncer(key negtypes.NegSyncerKey) {
sm.mu.Lock()
defer sm.mu.Unlock()
delete(sm.syncerStateMap, key)
delete(sm.syncerEndpointStateMap, key)
delete(sm.syncerEndpointSliceStateMap, key)
delete(sm.syncerLabelProagationStats, key)
delete(sm.dualStackMigrationStartTime, key)
delete(sm.dualStackMigrationEndTime, key)
delete(sm.endpointsCountPerType, key)
}

// computeLabelMetrics aggregates label propagation metrics.
Expand Down Expand Up @@ -205,3 +241,128 @@ func (sm *SyncerMetrics) computeEndpointStateMetrics(forDegradedMode bool) (negt
}
return epCounts, epsCounts
}

// CollectDualStackMigrationMetrics will be used by dualstack.Migrator to export
// metrics.
func (sm *SyncerMetrics) CollectDualStackMigrationMetrics(key negtypes.NegSyncerKey, committedEndpoints map[string]negtypes.NetworkEndpointSet, migrationCount int) {
sm.updateMigrationStartAndEndTime(key, migrationCount)
sm.updateEndpointsCountPerType(key, committedEndpoints, migrationCount)
}

func (sm *SyncerMetrics) updateMigrationStartAndEndTime(key negtypes.NegSyncerKey, migrationCount int) {
sm.mu.Lock()
defer sm.mu.Unlock()

_, hasStartTime := sm.dualStackMigrationStartTime[key]
_, hasEndTime := sm.dualStackMigrationEndTime[key]

if migrationCount == 0 {
//
// Migration has finished or it never started.
//
if !hasStartTime {
// Migration was never started.
return
}
if hasEndTime {
// Migration was already finished in some previous invocation.
return
}
sm.dualStackMigrationEndTime[key] = sm.clock.Now()
return
}

//
// Migration has started or it was already in progress.
//
if hasEndTime {
// A previous migration was completed but there are still migrating
// endpoints so extend the previous migration time.
delete(sm.dualStackMigrationEndTime, key)
}
if hasStartTime {
// Migration was already started in some previous invocation.
return
}
sm.dualStackMigrationStartTime[key] = sm.clock.Now()
}

func (sm *SyncerMetrics) updateEndpointsCountPerType(key negtypes.NegSyncerKey, committedEndpoints map[string]negtypes.NetworkEndpointSet, migrationCount int) {
sm.mu.Lock()
defer sm.mu.Unlock()

ipv4OnlyCount, ipv6OnlyCount, dualStackCount := 0, 0, 0
for _, endpointSet := range committedEndpoints {
for endpoint := range endpointSet {
if endpoint.IP != "" && endpoint.IPv6 != "" {
dualStackCount++
continue
}
if endpoint.IP != "" {
ipv4OnlyCount++
}
if endpoint.IPv6 != "" {
ipv6OnlyCount++
}
}
}
sm.endpointsCountPerType[key] = map[string]int{
ipv4EndpointType: ipv4OnlyCount,
ipv6EndpointType: ipv6OnlyCount,
dualStackEndpointType: dualStackCount,
migrationEndpointType: migrationCount,
}
}

func (sm *SyncerMetrics) computeDualStackMigrationDurations() ([]int, int) {
sm.mu.Lock()
defer sm.mu.Unlock()

finishedDurations, longestUnfinishedDuration := make([]int, 0), 0
swetharepakula marked this conversation as resolved.
Show resolved Hide resolved
for key, startTime := range sm.dualStackMigrationStartTime {
endTime, ok := sm.dualStackMigrationEndTime[key]
if !ok {
if curUnfinishedDuration := int(sm.clock.Since(startTime).Seconds()); curUnfinishedDuration > longestUnfinishedDuration {
longestUnfinishedDuration = curUnfinishedDuration
}
continue
}
finishedDurations = append(finishedDurations, int(endTime.Sub(startTime).Seconds()))
// Prevent metrics from being re-emitted by deleting the syncer key whose
// migrations have finished.
delete(sm.dualStackMigrationStartTime, key)
delete(sm.dualStackMigrationEndTime, key)
}

return finishedDurations, longestUnfinishedDuration
}

func (sm *SyncerMetrics) computeDualStackMigrationCounts() (map[string]int, int, int) {
sm.mu.Lock()
defer sm.mu.Unlock()

// It's important to explicitly initialize all types to zero so that their
// counts get reset when the metrics are published.
syncerCountByEndpointType := map[string]int{
ipv4EndpointType: 0,
ipv6EndpointType: 0,
dualStackEndpointType: 0,
migrationEndpointType: 0,
}
migrationEndpointCount := 0
migrationServices := sets.NewString()

for syncerKey, syncerEndpointsCountPerType := range sm.endpointsCountPerType {
for endpointType, count := range syncerEndpointsCountPerType {
if count != 0 {
syncerCountByEndpointType[endpointType]++
}
}

if count := syncerEndpointsCountPerType[migrationEndpointType]; count != 0 {
migrationServices.Insert(fmt.Sprintf("%s/%s", syncerKey.Namespace, syncerKey.Name))
migrationEndpointCount += count
}
}
return syncerCountByEndpointType, migrationEndpointCount, migrationServices.Len()
}
Loading