Skip to content

Commit

Permalink
inhibit the 2nd minor version upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
hongkailiu committed Aug 29, 2024
1 parent ce6c0b8 commit 0c7bbfe
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 35 deletions.
46 changes: 27 additions & 19 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second)
wait.UntilWithContext(runContext, func(runContext context.Context) {
optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSyncFunc(false))
}, time.Second)
resultChannel <- asyncResult{name: "upgradeable"}
}()

Expand All @@ -464,6 +466,10 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
// This is to ensure upgradeableCondition to be synced and thus to avoid the race caused by the throttle
if err := optr.upgradeableSyncFunc(true)(shutdownContext, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final upgradeable sync: %v", err))
}
}, time.Second)
resultChannel <- asyncResult{name: "cluster version sync"}
}()
Expand Down Expand Up @@ -738,27 +744,29 @@ func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) erro
return optr.syncAvailableUpdates(ctx, config)
}

// upgradeableSync is triggered on cluster version change (and periodic requeues) to
// upgradeableSyncFunc returns a function that is triggered on cluster version change (and periodic requeues) to
// sync upgradeableCondition. It only modifies cluster version.
func (optr *Operator) upgradeableSync(_ context.Context, key string) error {
startTime := time.Now()
klog.V(2).Infof("Started syncing upgradeable %q", key)
defer func() {
klog.V(2).Infof("Finished syncing upgradeable %q (%v)", key, time.Since(startTime))
}()
func (optr *Operator) upgradeableSyncFunc(ignoreThrottlePeriod bool) func(_ context.Context, key string) error {
return func(_ context.Context, key string) error {
startTime := time.Now()
klog.V(2).Infof("Started syncing upgradeable %q", key)
defer func() {
klog.V(2).Infof("Finished syncing upgradeable %q (%v)", key, time.Since(startTime))
}()

config, err := optr.cvLister.Get(optr.name)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if errs := validation.ValidateClusterVersion(config); len(errs) > 0 {
return nil
}
config, err := optr.cvLister.Get(optr.name)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if errs := validation.ValidateClusterVersion(config); len(errs) > 0 {
return nil
}

return optr.syncUpgradeable(config)
return optr.syncUpgradeable(config, ignoreThrottlePeriod)
}
}

// isOlderThanLastUpdate returns true if the cluster version is older than
Expand Down
2 changes: 1 addition & 1 deletion pkg/cvo/cvo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3906,7 +3906,7 @@ func TestOperator_upgradeableSync(t *testing.T) {
waitForCm(t, cms)
}

err = optr.upgradeableSync(ctx, optr.queueKey())
err = optr.upgradeableSyncFunc(false)(ctx, optr.queueKey())
if err != nil && tt.wantErr == nil {
t.Fatalf("Operator.sync() unexpected error: %v", err)
}
Expand Down
64 changes: 57 additions & 7 deletions pkg/cvo/upgradeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func defaultUpgradeableCheckIntervals() upgradeableCheckIntervals {

// syncUpgradeable synchronizes the upgradeable status only if the sufficient time passed since its last update. This
// throttling period is dynamic and is driven by upgradeableCheckIntervals.
func (optr *Operator) syncUpgradeable(cv *configv1.ClusterVersion) error {
if u := optr.getUpgradeable(); u != nil {
func (optr *Operator) syncUpgradeable(cv *configv1.ClusterVersion, ignoreThrottlePeriod bool) error {
if u := optr.getUpgradeable(); u != nil && !ignoreThrottlePeriod {
throttleFor := optr.upgradeableCheckIntervals.throttlePeriod(cv)
if earliestNext := u.At.Add(throttleFor); time.Now().Before(earliestNext) {
klog.V(2).Infof("Upgradeability last checked %s ago, will not re-check until %s", time.Since(u.At), earliestNext.Format(time.RFC3339))
Expand All @@ -80,16 +80,24 @@ func (optr *Operator) syncUpgradeable(cv *configv1.ClusterVersion) error {
func (optr *Operator) setUpgradeableConditions() {
now := metav1.Now()
var conds []configv1.ClusterOperatorStatusCondition
var minorVersionClusterUpgradeInProgressCondition *configv1.ClusterOperatorStatusCondition
var reasons []string
var msgs []string
klog.V(4).Infof("Checking upgradeability conditions")
for _, check := range optr.upgradeableChecks {
if cond := check.Check(); cond != nil {
reasons = append(reasons, cond.Reason)
msgs = append(msgs, cond.Message)
cond.LastTransitionTime = now
conds = append(conds, *cond)
klog.V(2).Infof("Upgradeability condition failed (type='%s' reason='%s' message='%s')", cond.Type, cond.Reason, cond.Message)
if cond.Type != clusterversion.MinorVersionClusterUpgradeInProgress {
reasons = append(reasons, cond.Reason)
msgs = append(msgs, cond.Message)
cond.LastTransitionTime = now
conds = append(conds, *cond)
klog.V(2).Infof("Upgradeability condition failed (type='%s' reason='%s' message='%s')", cond.Type, cond.Reason, cond.Message)
} else {
// MinorVersionClusterUpgradeInProgress inhibit only the 2nd minor upgrade. For example, it still allows for patch upgrades on top the ongoing minor upgrade.
cond.LastTransitionTime = now
minorVersionClusterUpgradeInProgressCondition = cond
klog.V(2).Infof("MinorVersionClusterUpgradeInProgress condition found (type='%s' status='%s' reason='%s' message='%s')", cond.Type, cond.Status, cond.Reason, cond.Message)
}
}
}
if len(conds) == 1 {
Expand All @@ -111,6 +119,9 @@ func (optr *Operator) setUpgradeableConditions() {
} else {
klog.V(2).Infof("All upgradeability conditions are passing")
}
if minorVersionClusterUpgradeInProgressCondition != nil {
conds = append(conds, *minorVersionClusterUpgradeInProgressCondition)
}
sort.Slice(conds, func(i, j int) bool { return conds[i].Type < conds[j].Type })
optr.setUpgradeable(&upgradeable{
Conditions: conds,
Expand Down Expand Up @@ -263,6 +274,44 @@ func (check *clusterVersionOverridesUpgradeable) Check() *configv1.ClusterOperat
return cond
}

type minorVersionClusterUpgradeInProgressUpgradeable struct {
name string
cvLister configlistersv1.ClusterVersionLister
}

func (check *minorVersionClusterUpgradeInProgressUpgradeable) Check() *configv1.ClusterOperatorStatusCondition {
cond := &configv1.ClusterOperatorStatusCondition{
Type: clusterversion.MinorVersionClusterUpgradeInProgress,
Status: configv1.ConditionTrue,
}

cv, err := check.cvLister.Get(check.name)
if meta.IsNoMatchError(err) || apierrors.IsNotFound(err) {
return nil
}

currentVersion := clusterversion.GetCurrentVersion(cv.Status.History)
// This can occur in early start up when the configmap is first added and version history
// has not yet been populated.
if currentVersion == "" {
klog.V(2).Infof("current version is empty")
return nil
}

currentMinor := clusterversion.GetEffectiveMinor(currentVersion)
desiredMinor := clusterversion.GetEffectiveMinor(cv.Status.Desired.Version)
klog.V(2).Infof("The current minor version is %s and the desired minor version is %s", currentMinor, desiredMinor)
if !clusterversion.MinorVersionUpgrade(currentMinor, desiredMinor) {
return nil
}

message := fmt.Sprintf("There is a minor level upgrade from %s to %s in progress.", currentVersion, cv.Status.Desired.Version)
klog.V(2).Info(cond.Message)
cond.Reason = "MinorVersionClusterUpgradeInProgress"
cond.Message = message
return cond
}

type clusterManifestDeleteInProgressUpgradeable struct {
}

Expand Down Expand Up @@ -421,6 +470,7 @@ func (optr *Operator) defaultUpgradeableChecks() []upgradeableCheck {
},
&clusterOperatorsUpgradeable{coLister: optr.coLister},
&clusterManifestDeleteInProgressUpgradeable{},
&minorVersionClusterUpgradeInProgressUpgradeable{name: optr.name, cvLister: optr.cvLister},
}
}

Expand Down
20 changes: 17 additions & 3 deletions pkg/payload/precondition/clusterversion/upgradeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clusterversion

import (
"context"
"fmt"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -46,6 +47,8 @@ func ClusterVersionOverridesCondition(cv *configv1.ClusterVersion) *configv1.Clu
return nil
}

const MinorVersionClusterUpgradeInProgress = configv1.ClusterStatusConditionType("UpgradeableMinorVersionClusterUpgradeInProgress")

// Run runs the Upgradeable precondition.
// If the feature gate `key` is not found, or the api for clusterversion doesn't exist, this check is inert and always returns nil error.
// Otherwise, if Upgradeable condition is set to false in the object, it returns an PreconditionError when possible.
Expand All @@ -63,6 +66,17 @@ func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.Rele
}
}

if upgradeableMinorVersionClusterUpgradeInProgress := resourcemerge.FindOperatorStatusCondition(cv.Status.Conditions, MinorVersionClusterUpgradeInProgress); upgradeableMinorVersionClusterUpgradeInProgress != nil && upgradeableMinorVersionClusterUpgradeInProgress.Status == configv1.ConditionTrue {
desiredMinor := GetEffectiveMinor(releaseContext.DesiredVersion)
if minorInProgress := GetEffectiveMinor(cv.Status.Desired.Version); MinorVersionUpgrade(minorInProgress, desiredMinor) {
return &precondition.Error{
Reason: "MinorVersionClusterUpgradeInProgress",
Message: fmt.Sprintf("The minor level upgrade to %s is not recommended: %s. It is recommended to wait until the existing minor level upgrade completes.", releaseContext.DesiredVersion, upgradeableMinorVersionClusterUpgradeInProgress.Message),
Name: pf.Name(),
}
}
}

// if we are upgradeable==true we can always upgrade
up := resourcemerge.FindOperatorStatusCondition(cv.Status.Conditions, configv1.OperatorUpgradeable)
if up == nil {
Expand All @@ -87,7 +101,7 @@ func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.Rele

// if there is no difference in the minor version (4.y.z where 4.y is the same for current and desired), then we can still upgrade
// if no cluster overrides have been set
if !minorVersionUpgrade(currentMinor, desiredMinor) {
if !MinorVersionUpgrade(currentMinor, desiredMinor) {
klog.V(2).Infof("Precondition %q passed: minor from the target %s is not a minor version update from the current %s.%s.", pf.Name(), releaseContext.DesiredVersion, currentVersion, currentMinor)
if condition := ClusterVersionOverridesCondition(cv); condition != nil {
klog.V(2).Infof("Update from %s to %s blocked by %s: %s", currentVersion, releaseContext.DesiredVersion, condition.Reason, condition.Message)
Expand Down Expand Up @@ -141,10 +155,10 @@ func GetEffectiveMinor(version string) string {
return splits[1]
}

// minorVersionUpgrade returns true if the the desired update minor version number is greater
// MinorVersionUpgrade returns true if the the desired update minor version number is greater
// than the current version minor version number. Errors resulting from either version
// number being unset or NaN are ignored simply resulting in false returned.
func minorVersionUpgrade(currentMinor string, desiredMinor string) bool {
func MinorVersionUpgrade(currentMinor string, desiredMinor string) bool {
if currentMinorNum, err := strconv.Atoi(currentMinor); err == nil {
if desiredMinorNum, err := strconv.Atoi(desiredMinor); err == nil {
if desiredMinorNum > currentMinorNum {
Expand Down
35 changes: 30 additions & 5 deletions pkg/payload/precondition/clusterversion/upgradeable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ func TestUpgradeableRun(t *testing.T) {
}

tests := []struct {
name string
upgradeable *configv1.ConditionStatus
currVersion string
desiredVersion string
expected string
name string
upgradeable *configv1.ConditionStatus
currVersion string
desiredVersion string
desiredVersionInCV string
minorVersionClusterUpgradeInProgress bool
expected string
}{
{
name: "first",
Expand Down Expand Up @@ -105,6 +107,21 @@ func TestUpgradeableRun(t *testing.T) {
desiredVersion: "4.1.4",
expected: "",
},
{
name: "move-(y+1) while move-y is in progress",
currVersion: "4.6.3",
desiredVersionInCV: "4.7.2",
desiredVersion: "4.8.1",
minorVersionClusterUpgradeInProgress: true,
expected: "The minor level upgrade to 4.8.1 is not recommended: MinorVersionClusterUpgradeInProgress y to y+1. It is recommended to wait until the existing minor level upgrade completes.",
},
{
name: "move-y with z while move-y is in progress",
currVersion: "4.6.3",
desiredVersionInCV: "4.7.2",
minorVersionClusterUpgradeInProgress: true,
desiredVersion: "4.7.3",
},
}

for _, tc := range tests {
Expand All @@ -114,6 +131,7 @@ func TestUpgradeableRun(t *testing.T) {
Spec: configv1.ClusterVersionSpec{},
Status: configv1.ClusterVersionStatus{
History: []configv1.UpdateHistory{},
Desired: configv1.Release{Version: tc.desiredVersionInCV},
},
}
if len(tc.currVersion) > 0 {
Expand All @@ -126,6 +144,13 @@ func TestUpgradeableRun(t *testing.T) {
Message: fmt.Sprintf("set to %v", *tc.upgradeable),
})
}
if tc.minorVersionClusterUpgradeInProgress {
clusterVersion.Status.Conditions = append(clusterVersion.Status.Conditions, configv1.ClusterOperatorStatusCondition{
Type: MinorVersionClusterUpgradeInProgress,
Status: configv1.ConditionTrue,
Message: "MinorVersionClusterUpgradeInProgress y to y+1",
})
}
cvLister := fakeClusterVersionLister(t, clusterVersion)
instance := NewUpgradeable(cvLister)

Expand Down

0 comments on commit 0c7bbfe

Please sign in to comment.