Skip to content

Commit

Permalink
Ensure all patch calls can conflict when resource version doesn't match
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Sep 12, 2024
1 parent 0c46b4c commit a630780
Show file tree
Hide file tree
Showing 16 changed files with 102 additions and 40 deletions.
20 changes: 13 additions & 7 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package disruption
import (
"bytes"
"context"
"errors"
stderrors "errors"
"fmt"
"strings"
"sync"
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -133,6 +133,9 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool {
return !c.queue.HasAny(s.ProviderID())
})...); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("removing taint from nodes, %w", err)
}

Expand All @@ -141,6 +144,9 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
c.recordRun(fmt.Sprintf("%T", m))
success, err := c.disrupt(ctx, m)
if err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("disrupting via reason=%q, %w", strings.ToLower(string(m.Reason())), err)
}
if success {
Expand Down Expand Up @@ -202,7 +208,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
})
// Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
return fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err)
}

var nodeClaimNames []string
Expand All @@ -211,7 +217,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, cmd); err != nil {
// If we failed to launch the replacement, don't disrupt. If this is some permanent failure,
// we don't want to disrupt workloads with no way to provision new nodes for them.
return multierr.Append(fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
return fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err)
}
}

Expand All @@ -230,10 +236,10 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)

if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames,
if err = c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil {
c.cluster.UnmarkForDeletion(providerIDs...)
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)))
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, err)
}

// An action is only performed and pods/nodes are only disrupted after a successful add to the queue
Expand Down Expand Up @@ -293,6 +299,6 @@ func (c *Controller) logInvalidBudgets(ctx context.Context) {
}
}
if buf.Len() > 0 {
log.FromContext(ctx).Error(errors.New(buf.String()), "detected disruption budget errors")
log.FromContext(ctx).Error(stderrors.New(buf.String()), "detected disruption budget errors")
}
}
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
if err := q.waitOrTerminate(ctx, cmd); err != nil {
// If recoverable, re-queue and try again.
if !IsUnrecoverableError(err) {
// store the error that is causing us to fail so we can bubble it up later if this times out.
// store the error that is causing us to fail, so we can bubble it up later if this times out.
cmd.lastError = err
// mark this item as done processing. This is necessary so that the RLI is able to add the item back in.
q.RateLimitingInterface.Done(cmd)
Expand Down
18 changes: 12 additions & 6 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
}

if err := c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}

Expand All @@ -100,10 +100,13 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, err
}

if err := c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", v1.DisruptedTaintKey, err)
if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", v1.DisruptedTaintKey, err))
}
if err := c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil {
if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
Expand All @@ -114,7 +117,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err := c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
Expand Down Expand Up @@ -231,8 +234,11 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error
stored := n.DeepCopy()
controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer)
if !equality.Semantic.DeepEqual(stored, n) {
// We use client.StrategicMergeFrom here since the node object supports it and
// a strategic merge patch represents the finalizer list as a keyed "set" so removing
// an item from the list doesn't replace the full list
if err := c.kubeClient.Patch(ctx, n, client.StrategicMergeFrom(stored)); err != nil {
return client.IgnoreNotFound(fmt.Errorf("patching node, %w", err))
return client.IgnoreNotFound(fmt.Errorf("removing finalizer, %w", err))
}

metrics.NodesTerminatedTotal.With(prometheus.Labels{
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/node/termination/terminator/terminator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func (t *Terminator) Taint(ctx context.Context, node *corev1.Node, taint corev1.
corev1.LabelNodeExcludeBalancers: "karpenter",
})
if !equality.Semantic.DeepEqual(node, stored) {
if err := t.kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the taint list
if err := t.kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return err
}
taintValues := []any{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/nodeclaim/consistency/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
return reconcile.Result{}, err
}
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
// We call Update() here rather than Patch() because patching a list with a JSON merge patch
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err = c.kubeClient.Status().Update(ctx, nodeClaim); client.IgnoreNotFound(err) != nil {
if err = c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/nodeclaim/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
results = append(results, res)
}
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
// We call Update() here rather than Patch() because patching a list with a JSON merge patch
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := c.kubeClient.Status().Update(ctx, nodeClaim); err != nil {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -87,7 +88,13 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
stored := nodeClaim.DeepCopy()
controllerutil.AddFinalizer(nodeClaim, v1.TerminationFinalizer)
if !equality.Semantic.DeepEqual(nodeClaim, stored) {
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the finalizer list
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/lifecycle/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (i *Initialization) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim)
stored := node.DeepCopy()
node.Labels = lo.Assign(node.Labels, map[string]string{v1.NodeInitializedLabelKey: "true"})
if !equality.Semantic.DeepEqual(stored, node) {
if err = i.kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil {
if err = i.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, no
v1.NodeRegisteredLabelKey: "true",
})
if !equality.Semantic.DeepEqual(stored, node) {
if err := r.kubeClient.Update(ctx, node); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the taint list
if err := r.kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return fmt.Errorf("syncing node, %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/podevents/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Controller) Reconcile(ctx context.Context, pod *corev1.Pod) (reconcile.
// otherwise, set the pod event time to now
nc.Status.LastPodEventTime.Time = c.clock.Now()
if !equality.Semantic.DeepEqual(stored, nc) {
if err := c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
if err = c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/nodeclaim/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (c *Controller) Reconcile(ctx context.Context, n *v1.NodeClaim) (reconcile.
//nolint:gocyclo
func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef("", nodeClaim.Status.NodeName), "provider-id", nodeClaim.Status.ProviderID))
stored := nodeClaim.DeepCopy()
if !controllerutil.ContainsFinalizer(nodeClaim, v1.TerminationFinalizer) {
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -130,12 +129,13 @@ func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (rec
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
}).Observe(time.Since(nodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).LastTransitionTime.Time).Seconds())
}
stored := nodeClaim.DeepCopy() // The NodeClaim may have been modified in the EnsureTerminated function
controllerutil.RemoveFinalizer(nodeClaim, v1.TerminationFinalizer)
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
// We call Update() here rather than Patch() because patching a list with a JSON merge patch
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732
if err = c.kubeClient.Update(ctx, nodeClaim); err != nil {
// Here, we are updating the finalizer list
if err = c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func (c *Controller) annotateTerminationGracePeriodTerminationTime(ctx context.C
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime})

if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
return client.IgnoreNotFound(fmt.Errorf("patching nodeclaim, %w", err))
return client.IgnoreNotFound(err)
}
log.FromContext(ctx).WithValues(v1.NodeClaimTerminationTimestampAnnotationKey, terminationTime).Info("annotated nodeclaim")
c.recorder.Publish(terminatorevents.NodeClaimTerminationGracePeriodExpiring(nodeClaim, terminationTime))
Expand Down
29 changes: 29 additions & 0 deletions pkg/controllers/nodeclaim/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"testing"
"time"

"github.com/awslabs/operatorpkg/object"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"

"sigs.k8s.io/karpenter/pkg/test/v1alpha1"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -150,6 +154,31 @@ var _ = Describe("Termination", func() {
_, err = cloudProvider.Get(ctx, nodeClaim.Status.ProviderID)
Expect(cloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue())
})
It("should delete the NodeClaim when the spec resource.Quantity values will change during deserialization", func() {
nodeClaim.SetGroupVersionKind(object.GVK(nodeClaim)) // This is needed so that the GVK is set on the unstructured object
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(nodeClaim)
Expect(err).ToNot(HaveOccurred())
// Set a value in resources that will get to converted to a value with a suffix e.g. 50k
Expect(unstructured.SetNestedStringMap(u, map[string]string{"memory": "50000"}, "spec", "resources", "requests")).To(Succeed())

obj := &unstructured.Unstructured{}
Expect(runtime.DefaultUnstructuredConverter.FromUnstructured(u, obj)).To(Succeed())

ExpectApplied(ctx, env.Client, nodePool, obj)
nodeClaim := ExpectExists(ctx, env.Client, nodeClaim)

Check failure on line 168 in pkg/controllers/nodeclaim/termination/suite_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

shadow: declaration of "nodeClaim" shadows declaration at line 98 (govet)

Check failure on line 168 in pkg/controllers/nodeclaim/termination/suite_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

shadow: declaration of "nodeClaim" shadows declaration at line 98 (govet)

Check failure on line 168 in pkg/controllers/nodeclaim/termination/suite_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

shadow: declaration of "nodeClaim" shadows declaration at line 98 (govet)

Check failure on line 168 in pkg/controllers/nodeclaim/termination/suite_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

shadow: declaration of "nodeClaim" shadows declaration at line 98 (govet)

Check failure on line 168 in pkg/controllers/nodeclaim/termination/suite_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

shadow: declaration of "nodeClaim" shadows declaration at line 98 (govet)

Check failure on line 168 in pkg/controllers/nodeclaim/termination/suite_test.go

View workflow job for this annotation

GitHub Actions / presubmit (1.30.x)

shadow: declaration of "nodeClaim" shadows declaration at line 98 (govet)
ExpectObjectReconciled(ctx, env.Client, nodeClaimLifecycleController, nodeClaim)

// Expect the node and the nodeClaim to both be gone
Expect(env.Client.Delete(ctx, nodeClaim)).To(Succeed())
result := ExpectObjectReconciled(ctx, env.Client, nodeClaimTerminationController, nodeClaim) // triggers the nodeclaim deletion

Expect(result.RequeueAfter).To(BeEquivalentTo(5 * time.Second))
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).IsTrue()).To(BeTrue())

ExpectObjectReconciled(ctx, env.Client, nodeClaimTerminationController, nodeClaim) // this will call cloudProvider Get to check if the instance is still around
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should requeue reconciliation if cloudProvider Get returns an error other than NodeClaimNotFoundError", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimLifecycleController, nodeClaim)
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/nodepool/readiness/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
c.setReadyCondition(nodePool, nodeClass)
}
if !equality.Semantic.DeepEqual(stored, nodePool) {
if err = c.kubeClient.Status().Update(ctx, nodePool); client.IgnoreNotFound(err) != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err = c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/nodepool/validation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
nodePool.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded)
}
if !equality.Semantic.DeepEqual(stored, nodePool) {
// We call Update() here rather than Patch() because patching a list with a JSON merge patch
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732
if e := c.kubeClient.Status().Update(ctx, nodePool); client.IgnoreNotFound(e) != nil {
// Here, we are updating the status condition list
if e := c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(e) != nil {
if errors.IsConflict(e) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/state/statenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,10 @@ func RequireNoScheduleTaint(ctx context.Context, kubeClient client.Client, addTa
node.Spec.Taints = append(node.Spec.Taints, v1.DisruptedNoScheduleTaint)
}
if !equality.Semantic.DeepEqual(stored, node) {
if err := kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the taint list
if err := kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
multiErr = multierr.Append(multiErr, fmt.Errorf("patching node %s, %w", node.Name, err))
}
}
Expand Down
Loading

0 comments on commit a630780

Please sign in to comment.