Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change spec.Parallel field with spec.MaxUnavailable #715

Merged
merged 3 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions api/shared/nodenetworkconfigurationpolicy_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package shared

import "k8s.io/apimachinery/pkg/util/intstr"

// NodeNetworkConfigurationPolicySpec defines the desired state of NodeNetworkConfigurationPolicy
type NodeNetworkConfigurationPolicySpec struct {
// NodeSelector is a selector which must be true for the policy to be applied to the node.
Expand All @@ -12,18 +14,20 @@ type NodeNetworkConfigurationPolicySpec struct {
// The desired configuration of the policy
DesiredState State `json:"desiredState,omitempty"`

// When set to true, changes are applied to all nodes in parallel
// MaxUnavailable specifies percentage or number
// of machines that can be updating at a time. Default is "50%".
// +optional
Parallel bool `json:"parallel,omitempty"`
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
}

// NodeNetworkConfigurationPolicyStatus defines the observed state of NodeNetworkConfigurationPolicy
type NodeNetworkConfigurationPolicyStatus struct {
Conditions ConditionList `json:"conditions,omitempty" optional:"true"`

// NodeRunningUpdate field is used for serializing cluster nodes configuration when Parallel flag is false
// UnavailableNodeCount represents the total number of potentially unavailable nodes that are
// processing a NodeNetworkConfigurationPolicy
// +optional
NodeRunningUpdate string `json:"nodeRunningUpdate,omitempty" optional:"true"`
UnavailableNodeCount int `json:"unavailableNodeCount,omitempty" optional:"true"`
}

const (
Expand Down
93 changes: 64 additions & 29 deletions controllers/nodenetworkconfigurationpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package controllers
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"reflect"
"time"

Expand All @@ -40,6 +39,8 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"

nmstateapi "github.com/nmstate/kubernetes-nmstate/api/shared"
Expand All @@ -48,11 +49,16 @@ import (
enactmentconditions "github.com/nmstate/kubernetes-nmstate/pkg/enactmentstatus/conditions"
"github.com/nmstate/kubernetes-nmstate/pkg/environment"
nmstate "github.com/nmstate/kubernetes-nmstate/pkg/helper"
"github.com/nmstate/kubernetes-nmstate/pkg/node"
"github.com/nmstate/kubernetes-nmstate/pkg/policyconditions"
"github.com/nmstate/kubernetes-nmstate/pkg/selectors"
"k8s.io/apimachinery/pkg/types"
)

const (
DEFAULT_MAXUNAVAILABLE = "50%"
)

var (
nodeName string
nodeRunningUpdateRetryTime = 5 * time.Second
Expand Down Expand Up @@ -154,7 +160,26 @@ func (r *NodeNetworkConfigurationPolicyReconciler) initializeEnactment(policy nm
})
}

func (r *NodeNetworkConfigurationPolicyReconciler) enactmentsCountsByPolicy(policy *nmstatev1beta1.NodeNetworkConfigurationPolicy) (enactmentconditions.ConditionCount, error) {
func (r *NodeNetworkConfigurationPolicyReconciler) maxUnavailableNodeCount(policy *nmstatev1beta1.NodeNetworkConfigurationPolicy) (int, error) {
nmstateNodes, err := node.NodesRunningNmstate(r.Client)
if err != nil {
return 0, err
}
intOrPercent := intstr.FromString(DEFAULT_MAXUNAVAILABLE)
if policy.Spec.MaxUnavailable != nil {
intOrPercent = *policy.Spec.MaxUnavailable
}
maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(&intOrPercent, len(nmstateNodes), true)
if err != nil {
return 0, err
}
if maxUnavailable < 1 {
maxUnavailable = 1
}
return maxUnavailable, nil
}

func (r *NodeNetworkConfigurationPolicyReconciler) enactmentsCountByPolicy(policy *nmstatev1beta1.NodeNetworkConfigurationPolicy) (enactmentconditions.ConditionCount, error) {
enactments := nmstatev1beta1.NodeNetworkConfigurationEnactmentList{}
policyLabelFilter := client.MatchingLabels{nmstateapi.EnactmentPolicyLabel: policy.GetName()}
err := r.Client.List(context.TODO(), &enactments, policyLabelFilter)
Expand All @@ -165,33 +190,44 @@ func (r *NodeNetworkConfigurationPolicyReconciler) enactmentsCountsByPolicy(poli
return enactmentCount, nil
}

func (r *NodeNetworkConfigurationPolicyReconciler) claimNodeRunningUpdate(policy *nmstatev1beta1.NodeNetworkConfigurationPolicy) error {
func (r *NodeNetworkConfigurationPolicyReconciler) incrementUnavailableNodeCount(policy *nmstatev1beta1.NodeNetworkConfigurationPolicy) error {
policyKey := types.NamespacedName{Name: policy.GetName(), Namespace: policy.GetNamespace()}
err := r.Client.Get(context.TODO(), policyKey, policy)
if err != nil {
return err
}
if policy.Status.NodeRunningUpdate != "" {
return apierrors.NewConflict(schema.GroupResource{Resource: "nodenetworkconfigurationpolicies"}, policy.Name, fmt.Errorf("Another node is working on configuration"))
maxUnavailable, err := r.maxUnavailableNodeCount(policy)
if err != nil {
return err
}
policy.Status.NodeRunningUpdate = nodeName
if policy.Status.UnavailableNodeCount >= maxUnavailable {
return apierrors.NewConflict(schema.GroupResource{Resource: "nodenetworkconfigurationpolicies"}, policy.Name, fmt.Errorf("maximal number of %d nodes are already processing policy configuration", policy.Status.UnavailableNodeCount))
}
policy.Status.UnavailableNodeCount += 1
err = r.Client.Status().Update(context.TODO(), policy)
if err != nil {
return err
}
return nil
}

func (r *NodeNetworkConfigurationPolicyReconciler) releaseNodeRunningUpdate(policyKey types.NamespacedName) {
func (r *NodeNetworkConfigurationPolicyReconciler) decrementUnavailableNodeCount(policy *nmstatev1beta1.NodeNetworkConfigurationPolicy) {
policyKey := types.NamespacedName{Name: policy.GetName(), Namespace: policy.GetNamespace()}
instance := &nmstatev1beta1.NodeNetworkConfigurationPolicy{}
_ = retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := r.Client.Get(context.TODO(), policyKey, instance)
if err != nil {
return err
}
instance.Status.NodeRunningUpdate = ""
if instance.Status.UnavailableNodeCount <= 0 {
return fmt.Errorf("no unavailable nodes")
}
instance.Status.UnavailableNodeCount -= 1
return r.Client.Status().Update(context.TODO(), instance)
})
if err != nil {
r.Log.Error(err, "error decrementing unavailableNodeCount")
}
}

// Reconcile reads that state of the cluster for a NodeNetworkConfigurationPolicy object and makes changes based on the state read
Expand Down Expand Up @@ -245,28 +281,27 @@ func (r *NodeNetworkConfigurationPolicyReconciler) Reconcile(ctx context.Context
}

enactmentConditions.NotifyMatching()
if !instance.Spec.Parallel {
enactmentCount, err := r.enactmentsCountsByPolicy(instance)
if err != nil {
log.Error(err, "Error getting enactment counts")
return ctrl.Result{}, nil
}
if enactmentCount.Failed() > 0 {
err = fmt.Errorf("policy has failing enactments, aborting")
log.Error(err, "")
enactmentConditions.NotifyAborted(err)
return ctrl.Result{}, nil
}
err = r.claimNodeRunningUpdate(instance)
if err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: nodeRunningUpdateRetryTime}, err
} else {
return ctrl.Result{}, err
}

enactmentCount, err := r.enactmentsCountByPolicy(instance)
if err != nil {
log.Error(err, "Error getting enactment counts")
return ctrl.Result{}, err
}
if enactmentCount.Failed() > 0 {
err = fmt.Errorf("policy has failing enactments, aborting")
log.Error(err, "")
enactmentConditions.NotifyAborted(err)
return ctrl.Result{}, nil
}

err = r.incrementUnavailableNodeCount(instance)
if err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: nodeRunningUpdateRetryTime}, err
}
defer r.releaseNodeRunningUpdate(request.NamespacedName)
return ctrl.Result{}, err
}
defer r.decrementUnavailableNodeCount(instance)

enactmentConditions.NotifyProgressing()
nmstateOutput, err := nmstate.ApplyDesiredState(r.Client, instance.Spec.DesiredState)
Expand Down
38 changes: 22 additions & 16 deletions deploy/crds/nmstate.io_nodenetworkconfigurationpolicies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,20 @@ spec:
description: The desired configuration of the policy
type: object
x-kubernetes-preserve-unknown-fields: true
maxUnavailable:
anyOf:
- type: integer
- type: string
description: MaxUnavailable specifies percentage or number of machines
that can be updating at a time. Default is "50%".
x-kubernetes-int-or-string: true
nodeSelector:
additionalProperties:
type: string
description: 'NodeSelector is a selector which must be true for the
policy to be applied to the node. Selector which must match a node''s
labels for the policy to be scheduled on that node. More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/'
type: object
parallel:
description: When set to true, changes are applied to all nodes in
parallel
type: boolean
type: object
status:
description: NodeNetworkConfigurationPolicyStatus defines the observed
Expand Down Expand Up @@ -87,10 +90,10 @@ spec:
- type
type: object
type: array
nodeRunningUpdate:
description: NodeRunningUpdate field is used for serializing cluster
nodes configuration when Parallel flag is false
type: string
unavailableNodeCount:
description: UnavailableNodeCount represents the total number of potentially
unavailable nodes that are processing a NodeNetworkConfigurationPolicy
type: integer
type: object
type: object
served: true
Expand Down Expand Up @@ -128,17 +131,20 @@ spec:
description: The desired configuration of the policy
type: object
x-kubernetes-preserve-unknown-fields: true
maxUnavailable:
anyOf:
- type: integer
- type: string
description: MaxUnavailable specifies percentage or number of machines
that can be updating at a time. Default is "50%".
x-kubernetes-int-or-string: true
nodeSelector:
additionalProperties:
type: string
description: 'NodeSelector is a selector which must be true for the
policy to be applied to the node. Selector which must match a node''s
labels for the policy to be scheduled on that node. More info: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/'
type: object
parallel:
description: When set to true, changes are applied to all nodes in
parallel
type: boolean
type: object
status:
description: NodeNetworkConfigurationPolicyStatus defines the observed
Expand Down Expand Up @@ -166,10 +172,10 @@ spec:
- type
type: object
type: array
nodeRunningUpdate:
description: NodeRunningUpdate field is used for serializing cluster
nodes configuration when Parallel flag is false
type: string
unavailableNodeCount:
description: UnavailableNodeCount represents the total number of potentially
unavailable nodes that are processing a NodeNetworkConfigurationPolicy
type: integer
type: object
type: object
served: true
Expand Down
32 changes: 32 additions & 0 deletions docs/user-guide/102-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,38 @@ status:
type: Matching
```

## Configuring multiple nodes concurrently

By default, Policy configuration is applied sequentially, one node at a time.
This configuration strategy is safe and prevents the entire cluster from being
temporarily unavailable, if the applied configuration breaks network connectivity.

For big clusters however, it may take too much time for a configuration to finish.
In such a case, `maxUnavailable` can be used to define portion size of a cluster
that can apply a policy configuration concurrently.
MaxUnavailable specifies percentage or a constant number of nodes that
can be progressing a policy at a time. The default is "50%" of cluster nodes.

The following policy specifies that up to 3 nodes may be progressing concurrently:

```yaml
{% include_absolute 'user-guide/linux-bridge_maxunavailable.yaml %}
```

```shell
kubectl apply -f linux-bridge_maxunavailable.yaml
```

```
NAME STATUS
node01.linux-bridge-maxunavailable AllSelectorsMatching
node02.linux-bridge-maxunavailable ConfigurationProgressing
node03.linux-bridge-maxunavailable SuccessfullyConfigured
node04.linux-bridge-maxunavailable ConfigurationProgressing
node05.linux-bridge-maxunavailable ConfigurationProgressing
node06.linux-bridge-maxunavailable AllSelectorsMatching
```

## Continue reading

The following tutorial will guide you through troubleshooting of a failed
Expand Down
22 changes: 22 additions & 0 deletions docs/user-guide/linux-bridge_maxunavailable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# When updating this file, don't forget to update the tutorial.
apiVersion: nmstate.io/v1beta1
kind: NodeNetworkConfigurationPolicy
metadata:
name: linux-bridge-maxunavailable
spec:
maxUnavailable: 3
desiredState:
interfaces:
- name: br1
description: Linux bridge with eth1 as a port
type: linux-bridge
state: up
ipv4:
dhcp: true
enabled: true
bridge:
options:
stp:
enabled: false
port:
- name: eth1
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.15

require (
github.com/Masterminds/semver v1.5.0
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/github-release/github-release v0.10.0
github.com/go-logr/logr v0.3.0
Expand All @@ -20,6 +19,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/qinqon/kube-admission-webhook v0.14.0
github.com/tidwall/gjson v1.6.8
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.20.2
k8s.io/apimachinery v0.20.2
k8s.io/client-go v12.0.0+incompatible
Expand Down
36 changes: 36 additions & 0 deletions pkg/node/nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package node

import (
"context"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pkg/errors"
)

func NodesRunningNmstate(cli client.Client) ([]corev1.Node, error) {
nodes := corev1.NodeList{}
err := cli.List(context.TODO(), &nodes)
if err != nil {
return []corev1.Node{}, errors.Wrap(err, "getting nodes failed")
}

pods := corev1.PodList{}
byApp := client.MatchingLabels{"app": "kubernetes-nmstate"}
err = cli.List(context.TODO(), &pods, byApp)
if err != nil {
return []corev1.Node{}, errors.Wrap(err, "getting pods failed")
}

filteredNodes := []corev1.Node{}
for _, node := range nodes.Items {
for _, pod := range pods.Items {
if node.Name == pod.Spec.NodeName {
filteredNodes = append(filteredNodes, node)
break
}
}
}
return filteredNodes, nil
}
Loading