Skip to content
This repository has been archived by the owner on Jan 11, 2023. It is now read-only.

fixing agent pool upgrade logic #1858

Merged
merged 8 commits into from
Dec 1, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions pkg/operations/kubernetesupgrade/upgradeagentnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

const (
interval = time.Second * 1
retry = time.Second * 5
timeout = time.Minute * 10
)

Expand Down Expand Up @@ -112,30 +113,27 @@ func (kan *UpgradeAgentNode) Validate(vmName *string) error {
return err
}

ch := make(chan struct{}, 1)
go func() {
for {
retryTimer := time.NewTimer(time.Millisecond)
timeoutTimer := time.NewTimer(timeout)
for {
select {
case <-timeoutTimer.C:
retryTimer.Stop()
kan.logger.Errorf("Node was not ready within %v", timeout)
return fmt.Errorf("Node was not ready within %v", timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] err := fmt.Errorf("Node was not ready within %v", timeout")
kan.logger.Error(err.Error())
return err

case <-retryTimer.C:
agentNode, err := client.GetNode(*vmName)
if err != nil {
kan.logger.Infof("Agent VM: %s status error: %v\n", *vmName, err)
time.Sleep(time.Second * 5)
retryTimer.Reset(retry)
} else if node.IsNodeReady(agentNode) {
kan.logger.Infof("Agent VM: %s is ready", *vmName)
ch <- struct{}{}
timeoutTimer.Stop()
return nil
} else {
kan.logger.Infof("Agent VM: %s not ready yet...", *vmName)
time.Sleep(time.Second * 5)
retryTimer.Reset(retry)
}
}
}()

for {
select {
case <-ch:
return nil
case <-time.After(timeout):
kan.logger.Errorf("Node was not ready within %v", timeout)
return fmt.Errorf("Node was not ready within %v", timeout)
}
}
}
158 changes: 97 additions & 61 deletions pkg/operations/kubernetesupgrade/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ type Upgrader struct {
kubeConfig string
}

// UpgradeVM holds VM name and upgrade status
type UpgradeVM struct {
Name string
Upgraded bool
type vmStatus int

const (
vmStatusUpgraded vmStatus = iota
vmStatusNotUpgraded
vmStatusIgnored
)

type vmInfo struct {
name string
status vmStatus
}

// Init initializes an upgrader struct
Expand Down Expand Up @@ -56,14 +63,14 @@ func (ku *Upgrader) upgradeMasterNodes() error {
if ku.ClusterTopology.DataModel.Properties.MasterProfile == nil {
return nil
}
ku.logger.Infof("Master nodes StorageProfile: %s\n", ku.ClusterTopology.DataModel.Properties.MasterProfile.StorageProfile)
ku.logger.Infof("Master nodes StorageProfile: %s", ku.ClusterTopology.DataModel.Properties.MasterProfile.StorageProfile)
// Upgrade Master VMs
templateMap, parametersMap, err := ku.generateUpgradeTemplate(ku.ClusterTopology.DataModel)
if err != nil {
return ku.Translator.Errorf("error generating upgrade template: %s", err.Error())
}

ku.logger.Infof("Prepping master nodes for upgrade...\n")
ku.logger.Infof("Prepping master nodes for upgrade...")

transformer := &acsengine.Transformer{
Translator: ku.Translator,
Expand All @@ -88,46 +95,46 @@ func (ku *Upgrader) upgradeMasterNodes() error {
mastersUpgradedCount := len(*ku.ClusterTopology.UpgradedMasterVMs)
mastersToUgradeCount := expectedMasterCount - mastersUpgradedCount

ku.logger.Infof("Total expected master count: %d\n", expectedMasterCount)
ku.logger.Infof("Master nodes that need to be upgraded: %d\n", mastersToUgradeCount)
ku.logger.Infof("Master nodes that have been upgraded: %d\n", mastersUpgradedCount)
ku.logger.Infof("Total expected master count: %d", expectedMasterCount)
ku.logger.Infof("Master nodes that need to be upgraded: %d", mastersToUgradeCount)
ku.logger.Infof("Master nodes that have been upgraded: %d", mastersUpgradedCount)

ku.logger.Infof("Starting upgrade of master nodes...\n")
ku.logger.Infof("Starting upgrade of master nodes...")

masterNodesInCluster := len(*ku.ClusterTopology.MasterVMs) + mastersUpgradedCount
ku.logger.Infof("masterNodesInCluster: %d\n", masterNodesInCluster)
ku.logger.Infof("masterNodesInCluster: %d", masterNodesInCluster)
if masterNodesInCluster > expectedMasterCount {
return ku.Translator.Errorf("Total count of master VMs: %d exceeded expected count: %d", masterNodesInCluster, expectedMasterCount)
}

upgradedMastersIndex := make(map[int]bool)

for _, vm := range *ku.ClusterTopology.UpgradedMasterVMs {
ku.logger.Infof("Master VM: %s is upgraded to expected orchestrator version\n", *vm.Name)
ku.logger.Infof("Master VM: %s is upgraded to expected orchestrator version", *vm.Name)
masterIndex, _ := armhelpers.GetVMNameIndex(vm.StorageProfile.OsDisk.OsType, *vm.Name)
upgradedMastersIndex[masterIndex] = true
}

for _, vm := range *ku.ClusterTopology.MasterVMs {
ku.logger.Infof("Upgrading Master VM: %s\n", *vm.Name)
ku.logger.Infof("Upgrading Master VM: %s", *vm.Name)

masterIndex, _ := armhelpers.GetVMNameIndex(vm.StorageProfile.OsDisk.OsType, *vm.Name)

err := upgradeMasterNode.DeleteNode(vm.Name, false)
if err != nil {
ku.logger.Infof("Error deleting master VM: %s, err: %v\n", *vm.Name, err)
ku.logger.Infof("Error deleting master VM: %s, err: %v", *vm.Name, err)
return err
}

err = upgradeMasterNode.CreateNode("master", masterIndex)
if err != nil {
ku.logger.Infof("Error creating upgraded master VM: %s\n", *vm.Name)
ku.logger.Infof("Error creating upgraded master VM: %s", *vm.Name)
return err
}

err = upgradeMasterNode.Validate(vm.Name)
if err != nil {
ku.logger.Infof("Error validating upgraded master VM: %s\n", *vm.Name)
ku.logger.Infof("Error validating upgraded master VM: %s", *vm.Name)
return err
}

Expand All @@ -138,11 +145,11 @@ func (ku *Upgrader) upgradeMasterNodes() error {
// VM upgrade when a master VM was deleted but creation of upgraded master did not run.
if masterNodesInCluster < expectedMasterCount {
ku.logger.Infof(
"Found missing master VMs in the cluster. Reconstructing names of missing master VMs for recreation during upgrade...\n")
"Found missing master VMs in the cluster. Reconstructing names of missing master VMs for recreation during upgrade...")
}

mastersToCreate := expectedMasterCount - masterNodesInCluster
ku.logger.Infof("Expected master count: %d, Creating %d more master VMs\n", expectedMasterCount, mastersToCreate)
ku.logger.Infof("Expected master count: %d, Creating %d more master VMs", expectedMasterCount, mastersToCreate)

// NOTE: this is NOT completely idempotent because it assumes that
// the OS disk has been deleted
Expand All @@ -152,18 +159,18 @@ func (ku *Upgrader) upgradeMasterNodes() error {
masterIndexToCreate++
}

ku.logger.Infof("Creating upgraded master VM with index: %d\n", masterIndexToCreate)
ku.logger.Infof("Creating upgraded master VM with index: %d", masterIndexToCreate)

err = upgradeMasterNode.CreateNode("master", masterIndexToCreate)
if err != nil {
ku.logger.Infof("Error creating upgraded master VM with index: %d\n", masterIndexToCreate)
ku.logger.Infof("Error creating upgraded master VM with index: %d", masterIndexToCreate)
return err
}

tempVMName := ""
err = upgradeMasterNode.Validate(&tempVMName)
if err != nil {
ku.logger.Infof("Error validating upgraded master VM with index: %d\n", masterIndexToCreate)
ku.logger.Infof("Error validating upgraded master VM with index: %d", masterIndexToCreate)
return err
}

Expand All @@ -182,7 +189,7 @@ func (ku *Upgrader) upgradeAgentPools() error {
return ku.Translator.Errorf("error generating upgrade template: %s", err.Error())
}

ku.logger.Infof("Prepping agent pool: %s for upgrade...\n", *agentPool.Name)
ku.logger.Infof("Prepping agent pool '%s' for upgrade...", *agentPool.Name)

preservePools := map[string]bool{*agentPool.Name: true}
transformer := &acsengine.Transformer{
Expand Down Expand Up @@ -210,6 +217,11 @@ func (ku *Upgrader) upgradeAgentPools() error {
}
}

if agentCount == 0 {
ku.logger.Infof("Agent pool '%s' is empty", *agentPool.Name)
return nil
}

upgradeAgentNode := UpgradeAgentNode{
Translator: ku.Translator,
logger: ku.logger,
Expand All @@ -221,98 +233,122 @@ func (ku *Upgrader) upgradeAgentPools() error {
upgradeAgentNode.Client = ku.Client
upgradeAgentNode.kubeConfig = ku.kubeConfig

upgradeVMs := make(map[int]*UpgradeVM)

// Go over upgraded VMs and verify provisioning state. Delete the VM if the state is not 'Succeeded'.
// Such a VM will be re-created later in this function.
agentVMs := make(map[int]*vmInfo)
// Go over upgraded VMs and verify provisioning state
// per https://docs.microsoft.com/en-us/rest/api/compute/virtualmachines/virtualmachines-state :
// - Creating: Indicates the virtual Machine is being created.
// - Updating: Indicates that there is an update operation in progress on the Virtual Machine.
// - Succeeded: Indicates that the operation executed on the virtual machine succeeded.
// - Deleting: Indicates that the virtual machine is being deleted.
// - Failed: Indicates that the update operation on the Virtual Machine failed.
// Delete VMs in 'bad' state. Such VMs will be re-created later in this function.
upgradedCount := 0
for _, vm := range *agentPool.UpgradedAgentVMs {
ku.logger.Infof("Agent VM: %s, pool name: %s on expected orchestrator version\n", *vm.Name, *agentPool.Name)
ku.logger.Infof("Agent VM: %s, pool name: %s on expected orchestrator version", *vm.Name, *agentPool.Name)
var vmProvisioningState string
if vm.VirtualMachineProperties != nil && vm.VirtualMachineProperties.ProvisioningState != nil {
vmProvisioningState = *vm.VirtualMachineProperties.ProvisioningState
}
agentIndex, _ := armhelpers.GetVMNameIndex(vm.StorageProfile.OsDisk.OsType, *vm.Name)

switch vmProvisioningState {
case "Creating", "Updating", "Succeeded":
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusUpgraded}
upgradedCount++

if vmProvisioningState != string(api.Succeeded) {
ku.logger.Infof("Deleting agent VM %s in provisioning state %s\n", *vm.Name, vmProvisioningState)
case "Failed":
ku.logger.Infof("Deleting agent VM %s in provisioning state %s", *vm.Name, vmProvisioningState)
err := upgradeAgentNode.DeleteNode(vm.Name, false)
if err != nil {
ku.logger.Errorf("Error deleting agent VM %s: %v\n", *vm.Name, err)
ku.logger.Errorf("Error deleting agent VM %s: %v", *vm.Name, err)
return err
}
} else {
agentIndex, _ := armhelpers.GetVMNameIndex(vm.StorageProfile.OsDisk.OsType, *vm.Name)
upgradeVMs[agentIndex] = &UpgradeVM{*vm.Name, true}

case "Deleting":
fallthrough
default:
ku.logger.Infof("Ignoring agent VM %s in provisioning state %s", *vm.Name, vmProvisioningState)
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusIgnored}
}
}

ku.logger.Infof("Starting upgrade of agent nodes in pool identifier: %s, name: %s...\n",
*agentPool.Identifier, *agentPool.Name)

for _, vm := range *agentPool.AgentVMs {
agentIndex, _ := armhelpers.GetVMNameIndex(vm.StorageProfile.OsDisk.OsType, *vm.Name)
upgradeVMs[agentIndex] = &UpgradeVM{*vm.Name, false}
agentVMs[agentIndex] = &vmInfo{*vm.Name, vmStatusNotUpgraded}
}
toBeUpgradedCount := len(*agentPool.AgentVMs)

ku.logger.Infof("Starting upgrade of %d agent nodes (out of %d) in pool identifier: %s, name: %s...",
toBeUpgradedCount, agentCount, *agentPool.Identifier, *agentPool.Name)

// Create missing nodes *plus* one extra node, which will be used to take on the load from upgrading nodes.
// In a normal mode of operation the actual number of VMs in the pool [len(upgradeVMs)] is equal to agentCount.
// However, if the upgrade failed in the middle, the actual number of VMs might differ.
for agentCount > 0 && len(upgradeVMs) <= agentCount {
agentIndex := getAvailableIndex(upgradeVMs)
// Create missing nodes to match agentCount. This could be due to previous upgrade failure
// If there are nodes that need to be upgraded, create one extra node, which will be used to take on the load from upgrading nodes.
if toBeUpgradedCount > 0 {
agentCount++
}
for upgradedCount+toBeUpgradedCount < agentCount {
agentIndex := getAvailableIndex(agentVMs)

vmName, err := armhelpers.GetK8sVMName(agentOsType, ku.DataModel.Properties.HostedMasterProfile != nil,
ku.NameSuffix, agentPoolName, agentPoolIndex, agentIndex)
if err != nil {
ku.logger.Errorf("Error reconstructing agent VM name with index %d: %v\n", agentIndex, err)
ku.logger.Errorf("Error reconstructing agent VM name with index %d: %v", agentIndex, err)
return err
}
ku.logger.Infof("Adding agent node %s (index %d)", vmName, agentIndex)
ku.logger.Infof("Creating new agent node %s (index %d)", vmName, agentIndex)

err = upgradeAgentNode.CreateNode(*agentPool.Name, agentIndex)
if err != nil {
ku.logger.Errorf("Error creating agent node %s (index %d): %v\n", vmName, agentIndex, err)
ku.logger.Errorf("Error creating agent node %s (index %d): %v", vmName, agentIndex, err)
return err
}

err = upgradeAgentNode.Validate(&vmName)
if err != nil {
ku.logger.Infof("Error validating agent node %s (index %d): %v\n", vmName, agentIndex, err)
ku.logger.Infof("Error validating agent node %s (index %d): %v", vmName, agentIndex, err)
return err
}

upgradeVMs[agentIndex] = &UpgradeVM{vmName, true}
agentVMs[agentIndex] = &vmInfo{vmName, vmStatusUpgraded}
upgradedCount++
}

if toBeUpgradedCount == 0 {
ku.logger.Infof("No nodes to upgrade")
return nil
}

// Upgrade nodes in agent pool
upgradedCount, toBeUpgraded := 0, len(*agentPool.AgentVMs)
for agentIndex, vm := range upgradeVMs {
if vm.Upgraded {
upgradedCount = 0
for agentIndex, vm := range agentVMs {
if vm.status != vmStatusNotUpgraded {
continue
}
ku.logger.Infof("Upgrading Agent VM: %s, pool name: %s\n", vm.Name, *agentPool.Name)
ku.logger.Infof("Upgrading Agent VM: %s, pool name: %s", vm.name, *agentPool.Name)

err := upgradeAgentNode.DeleteNode(&vm.Name, true)
err := upgradeAgentNode.DeleteNode(&vm.name, true)
if err != nil {
ku.logger.Errorf("Error deleting agent VM %s: %v\n", vm.Name, err)
ku.logger.Errorf("Error deleting agent VM %s: %v", vm.name, err)
return err
}

// do not create last node in favor of already created extra node.
if upgradedCount == toBeUpgraded-1 {
ku.logger.Infof("Skipping creation of VM %s (index %d)", vm.Name, agentIndex)
delete(upgradeVMs, agentIndex)
if upgradedCount == toBeUpgradedCount-1 {
ku.logger.Infof("Skipping creation of VM %s (index %d)", vm.name, agentIndex)
delete(agentVMs, agentIndex)
} else {
err = upgradeAgentNode.CreateNode(*agentPool.Name, agentIndex)
if err != nil {
ku.logger.Errorf("Error creating upgraded agent VM %s: %v\n", vm.Name, err)
ku.logger.Errorf("Error creating upgraded agent VM %s: %v", vm.name, err)
return err
}

err = upgradeAgentNode.Validate(&vm.Name)
err = upgradeAgentNode.Validate(&vm.name)
if err != nil {
ku.logger.Errorf("Error validating upgraded agent VM %s: %v\n", vm.Name, err)
ku.logger.Errorf("Error validating upgraded agent VM %s: %v", vm.name, err)
return err
}
vm.Upgraded = true
vm.status = vmStatusUpgraded
}
upgradedCount++
}
Expand Down Expand Up @@ -348,7 +384,7 @@ func (ku *Upgrader) generateUpgradeTemplate(upgradeContainerService *api.Contain
}

// return unused index within the range of agent indices, or subsequent index
func getAvailableIndex(vms map[int]*UpgradeVM) int {
func getAvailableIndex(vms map[int]*vmInfo) int {
maxIndex := 0

for indx := range vms {
Expand Down