Skip to content

Commit

Permalink
Merge pull request kubernetes#2618 from 08volt/my-release-1.30-2
Browse files Browse the repository at this point in the history
Release v1.30 -[Cherry-pick kubernetes#2582 kubernetes#2610 -> 1.30] Fix instance group size bug and add 2 new metrics to monitor resize events
  • Loading branch information
k8s-ci-robot authored Aug 7, 2024
2 parents 85b5cf1 + f9719cf commit 7fe0e31
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 130 deletions.
164 changes: 50 additions & 114 deletions pkg/instancegroups/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

metrics "k8s.io/ingress-gce/pkg/instancegroups/metrics"

"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
Expand Down Expand Up @@ -190,34 +192,6 @@ func (m *manager) DeleteInstanceGroup(name string) error {
return fmt.Errorf("%v", errs)
}

// listIGInstances lists all instances of provided instance group name in all zones.
// The return format will be a set of nodes in the instance group and
// a map from node name to zone.
func (m *manager) listIGInstances(name string) (sets.String, map[string]string, error) {
nodeNames := sets.NewString()
nodeZoneMap := make(map[string]string)
zones, err := m.ZoneGetter.ListZones(zonegetter.AllNodesFilter, m.logger)
if err != nil {
return nodeNames, nodeZoneMap, err
}

for _, zone := range zones {
instances, err := m.cloud.ListInstancesInInstanceGroup(name, zone, allInstances)
if err != nil {
return nodeNames, nodeZoneMap, err
}
for _, ins := range instances {
name, err := utils.KeyName(ins.Instance)
if err != nil {
return nodeNames, nodeZoneMap, err
}
nodeNames.Insert(name)
nodeZoneMap[name] = zone
}
}
return nodeNames, nodeZoneMap, nil
}

// Get returns the Instance Group by name.
func (m *manager) Get(name, zone string) (*compute.InstanceGroup, error) {
ig, err := m.cloud.GetInstanceGroup(name, zone)
Expand Down Expand Up @@ -288,63 +262,32 @@ func (m *manager) getInstanceReferences(zone string, nodeNames []string) (refs [
}

// Add adds the given instances to the appropriately zoned Instance Group.
func (m *manager) add(groupName string, names []string) error {
events.GlobalEventf(m.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range m.splitNodesByZone(names) {
m.logger.V(1).Info("Adding nodes to instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone)
if err := m.cloud.AddInstancesToInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames)); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
func (m *manager) add(groupName string, nodeNames []string, zone string) error {
events.GlobalEventf(m.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(nodeNames), groupName)
m.logger.V(1).Info("Adding nodes to instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone)
err := m.cloud.AddInstancesToInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames))
if err != nil && !utils.IsMemberAlreadyExistsError(err) {
events.GlobalEventf(m.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(nodeNames), groupName, err)
return err
}

err := fmt.Errorf("AddInstances: %v", errs)
events.GlobalEventf(m.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
return nil
}

// Remove removes the given instances from the appropriately zoned Instance Group.
func (m *manager) remove(groupName string, names []string, nodeZoneMap map[string]string) error {
events.GlobalEventf(m.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error

// Get the zone information from nameZoneMap instead of ZoneGetter.
// Since the ZoneGetter is based on k8s nodes but in most remove cases,
// k8s nodes do not exist. It will be impossible to get zone infromation.
nodesByZone := map[string][]string{}
for _, name := range names {
zone, ok := nodeZoneMap[name]
if !ok {
m.logger.Error(nil, "Failed to get zones for node, skipping", "name", name)
continue
}
if _, ok := nodesByZone[zone]; !ok {
nodesByZone[zone] = []string{}
}
nodesByZone[zone] = append(nodesByZone[zone], name)
}
func (m *manager) remove(groupName string, nodeNames []string, zone string) error {
events.GlobalEventf(m.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(nodeNames), groupName)

for zone, nodeNames := range nodesByZone {
m.logger.V(1).Info("Removing nodes from instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone)
if err := m.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames)); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
m.logger.V(1).Info("Removing nodes from instance group in zone", "nodeCount", len(nodeNames), "name", groupName, "zone", zone)
if err := m.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, m.getInstanceReferences(zone, nodeNames)); err != nil {
events.GlobalEventf(m.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(nodeNames), groupName, err)
return err
}

err := fmt.Errorf("RemoveInstances: %v", errs)
events.GlobalEventf(m.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
return nil
}

// Sync nodes with the instances in the instance group.
func (m *manager) Sync(nodes []string) (err error) {
m.logger.V(2).Info("Syncing nodes", "nodes", nodes)
m.logger.V(2).Info("Syncing nodes", "nodes", events.TruncatedStringList(nodes))

defer func() {
// The node pool is only responsible for syncing nodes to instance
Expand All @@ -359,65 +302,58 @@ func (m *manager) Sync(nodes []string) (err error) {
}
}()

pool, err := m.List()
if err != nil {
m.logger.Error(err, "List error")
return err
}
// For each zone add up to #m.maxIGSize number of nodes to the instance group
// If there is more then truncate last nodes (in alphabetical order)
// the logic should be consistent with cloud-provider-gcp's Legacy L4 ILB Controller:
// https://github.com/kubernetes/cloud-provider-gcp/blob/fca628cb3bf9267def0abb509eaae87d2d4040f3/providers/gce/gce_loadbalancer_internal.go#L606C1-L675C1
// the m.maxIGSize should be set to 1000 as is in the cloud-provider-gcp.
zonedNodes := m.splitNodesByZone(nodes)
for zone, kubeNodesFromZone := range zonedNodes {
igName := m.namer.InstanceGroup()
if len(kubeNodesFromZone) > m.maxIGSize {
sortedKubeNodesFromZone := sets.NewString(kubeNodesFromZone...).List()
loggableNodeList := events.TruncatedStringList(sortedKubeNodesFromZone[m.maxIGSize:])
m.logger.Info(fmt.Sprintf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. zone: %s. First truncated instances: %v", len(kubeNodesFromZone), m.maxIGSize, zone, loggableNodeList))
kubeNodesFromZone = sortedKubeNodesFromZone[:m.maxIGSize]
}

kubeNodes := sets.NewString(kubeNodesFromZone...)

for _, igName := range pool {
// Keep the zone information for each node in this map.
// This will be used as a reference to get zone information
// when removing nodes.
gceNodes, gceNodeZoneMap, err := m.listIGInstances(igName)
gceNodes := sets.NewString()
instances, err := m.cloud.ListInstancesInInstanceGroup(igName, zone, allInstances)
if err != nil {
m.logger.Error(err, "listIGInstances error", "name", igName)
m.logger.Error(err, "Failed to list instance from instance group", "zone", zone, "igName", igName)
return err
}
kubeNodes := sets.NewString(nodes...)

// Individual InstanceGroup has a limit for 1000 instances in it.
// As a result, it's not possible to add more to it.
if len(kubeNodes) > m.maxIGSize {
// List() will return a sorted list so the kubeNodesList truncation will have a stable set of nodes.
kubeNodesList := kubeNodes.List()

// Store first 10 truncated nodes for logging
truncateForLogs := func(nodes []string) []string {
maxLogsSampleSize := 10
if len(nodes) <= maxLogsSampleSize {
return nodes
}
return nodes[:maxLogsSampleSize]
for _, ins := range instances {
instance, err := utils.KeyName(ins.Instance)
if err != nil {
m.logger.Error(err, "Failed to read instance name from ULR, skipping single instance", "Instance URL", ins.Instance)
}

m.logger.Info(fmt.Sprintf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First truncated instances: %v", len(kubeNodesList), m.maxIGSize, igName, truncateForLogs(nodes[m.maxIGSize:])))
kubeNodes = sets.NewString(kubeNodesList[:m.maxIGSize]...)
gceNodes.Insert(instance)
}

// A node deleted via kubernetes could still exist as a gce vm. We don't
// want to route requests to it. Similarly, a node added to kubernetes
// needs to get added to the instance group so we do route requests to it.

removeNodes := gceNodes.Difference(kubeNodes).List()
addNodes := kubeNodes.Difference(gceNodes).List()

m.logger.V(2).Info("Removing nodes", "removeNodes", removeNodes)
m.logger.V(2).Info("Adding nodes", "addNodes", addNodes)
m.logger.V(2).Info("Removing nodes", "removeNodes", events.TruncatedStringList(removeNodes))
m.logger.V(2).Info("Adding nodes", "addNodes", events.TruncatedStringList(removeNodes))

start := time.Now()
if len(removeNodes) != 0 {
err = m.remove(igName, removeNodes, gceNodeZoneMap)
m.logger.V(2).Info("Remove finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "removeNodes", removeNodes)
metrics.PublishInstanceGroupRemove(len(removeNodes))
err = m.remove(igName, removeNodes, zone)
m.logger.V(2).Info("Remove finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "removeNodes", events.TruncatedStringList(removeNodes))
if err != nil {
return err
}
}

start = time.Now()
if len(addNodes) != 0 {
err = m.add(igName, addNodes)
m.logger.V(2).Info("Add finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "addNodes", addNodes)
metrics.PublishInstanceGroupAdd(len(addNodes))
err = m.add(igName, addNodes, zone)
m.logger.V(2).Info("Add finished", "name", igName, "err", err, "timeTaken", time.Now().Sub(start), "addNodes", events.TruncatedStringList(addNodes))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 7fe0e31

Please sign in to comment.