Skip to content

Commit

Permalink
feat: add startup taint removal feature
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
andyzhangx committed Apr 29, 2024
1 parent d19dd50 commit 760761a
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 3 deletions.
Binary file modified charts/latest/azuredisk-csi-driver-v0.0.0.tgz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ rules:
verbs: ["get"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get"]

---
Expand Down
4 changes: 3 additions & 1 deletion deploy/rbac-csi-azuredisk-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ rules:
verbs: ["get"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get"]

---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
5 changes: 3 additions & 2 deletions pkg/azureconstants/azure_constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ const (
BlockDeviceRootPathLinux = "/sys/block"
DummyBlockDevicePathLinux = "/sys/block/sda"
// define different sleep time when hit throttling
SnapshotOpThrottlingSleepSec = 50
MaxThrottlingSleepSec = 1200
SnapshotOpThrottlingSleepSec = 50
MaxThrottlingSleepSec = 1200
AgentNotReadyNodeTaintKeySuffix = "/agent-not-ready"
)

var (
Expand Down
124 changes: 124 additions & 0 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package azuredisk

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand All @@ -33,8 +34,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
Expand All @@ -54,6 +58,17 @@ import (
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
)

var (
// taintRemovalInitialDelay is the initial delay for node taint removal
taintRemovalInitialDelay = 1 * time.Second
// taintRemovalBackoff is the exponential backoff configuration for node taint removal
taintRemovalBackoff = wait.Backoff{
Duration: 500 * time.Millisecond,
Factor: 2,
Steps: 10, // Max delay = 0.5 * 2^9 = ~4 minutes
}
)

// CSIDriver defines the interface for a CSI driver.
type CSIDriver interface {
csi.ControllerServer
Expand Down Expand Up @@ -106,6 +121,7 @@ type DriverCore struct {
forceDetachBackoff bool
endpoint string
disableAVSetNodes bool
removeNotReadyTaint bool
kubeClient kubernetes.Interface
// a timed cache storing volume stats <volumeID, volumeStats>
volStatsCache azcache.Resource
Expand Down Expand Up @@ -157,6 +173,7 @@ func newDriverV1(options *DriverOptions) *Driver {
driver.forceDetachBackoff = options.ForceDetachBackoff
driver.endpoint = options.Endpoint
driver.disableAVSetNodes = options.DisableAVSetNodes
driver.removeNotReadyTaint = options.RemoveNotReadyTaint
driver.volumeLocks = volumehelper.NewVolumeLocks()
driver.ioHandler = azureutils.NewOSIOHandler()
driver.hostUtil = hostutil.NewHostUtil()
Expand Down Expand Up @@ -277,6 +294,14 @@ func newDriverV1(options *DriverOptions) *Driver {
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
})

if kubeClient != nil && driver.removeNotReadyTaint {
// Remove taint from node to indicate driver startup success
// This is done at the last possible moment to prevent race conditions or false positive removals
time.AfterFunc(taintRemovalInitialDelay, func() {
removeTaintInBackground(kubeClient, driver.NodeID, driver.Name, taintRemovalBackoff, removeNotReadyTaint)
})
}
return &driver
}

Expand Down Expand Up @@ -636,3 +661,102 @@ func getVMSSInstanceName(computeName string) (string, error) {
}
return fmt.Sprintf("%s%06s", names[0], strconv.FormatInt(int64(instanceID), 36)), nil
}

// Struct for JSON patch operations
type JSONPatch struct {
OP string `json:"op,omitempty"`
Path string `json:"path,omitempty"`
Value interface{} `json:"value"`
}

// removeTaintInBackground is a goroutine that retries removeNotReadyTaint with exponential backoff
func removeTaintInBackground(k8sClient kubernetes.Interface, nodeName, driverName string, backoff wait.Backoff, removalFunc func(kubernetes.Interface, string, string) error) {
backoffErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := removalFunc(k8sClient, nodeName, driverName)
if err != nil {
klog.ErrorS(err, "Unexpected failure when attempting to remove node taint(s)")
return false, nil
}
return true, nil
})

if backoffErr != nil {
klog.ErrorS(backoffErr, "Retries exhausted, giving up attempting to remove node taint(s)")
}
}

// removeNotReadyTaint removes the taint disk.csi.azure.com/agent-not-ready from the local node
// This taint can be optionally applied by users to prevent startup race conditions such as
// https://github.com/kubernetes/kubernetes/issues/95911
func removeNotReadyTaint(clientset kubernetes.Interface, nodeName, driverName string) error {
ctx := context.Background()
node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return err
}

if err := checkAllocatable(ctx, clientset, nodeName, driverName); err != nil {
return err
}

taintKeyToRemove := driverName + consts.AgentNotReadyNodeTaintKeySuffix
klog.V(2).Infof("removing taint with key %s from local node %s", taintKeyToRemove, nodeName)
var taintsToKeep []corev1.Taint
for _, taint := range node.Spec.Taints {
klog.V(5).Infof("checking taint key %s, value %s, effect %s", taint.Key, taint.Value, taint.Effect)
if taint.Key != taintKeyToRemove {
taintsToKeep = append(taintsToKeep, taint)
} else {
klog.V(2).Infof("queued taint for removal with key %s, effect %s", taint.Key, taint.Effect)
}
}

if len(taintsToKeep) == len(node.Spec.Taints) {
klog.V(2).Infof("No taints to remove on node, skipping taint removal")
return nil
}

patchRemoveTaints := []JSONPatch{
{
OP: "test",
Path: "/spec/taints",
Value: node.Spec.Taints,
},
{
OP: "replace",
Path: "/spec/taints",
Value: taintsToKeep,
},
}

patch, err := json.Marshal(patchRemoveTaints)
if err != nil {
return err
}

_, err = clientset.CoreV1().Nodes().Patch(ctx, nodeName, k8stypes.JSONPatchType, patch, metav1.PatchOptions{})
if err != nil {
return err
}
klog.V(2).Infof("removed taint with key %s from local node %s successfully", taintKeyToRemove, nodeName)
return nil
}

func checkAllocatable(ctx context.Context, clientset kubernetes.Interface, nodeName, driverName string) error {
csiNode, err := clientset.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("isAllocatableSet: failed to get CSINode for %s: %w", nodeName, err)
}

for _, driver := range csiNode.Spec.Drivers {
if driver.Name == driverName {
if driver.Allocatable != nil && driver.Allocatable.Count != nil {
klog.V(2).Infof("CSINode Allocatable value is set for driver on node %s, count %d", nodeName, *driver.Allocatable.Count)
return nil
}
return fmt.Errorf("isAllocatableSet: allocatable value not set for driver on node %s", nodeName)
}
}

return fmt.Errorf("isAllocatableSet: driver not found on node %s", nodeName)
}
2 changes: 2 additions & 0 deletions pkg/azuredisk/azuredisk_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type DriverOptions struct {
Kubeconfig string
Endpoint string
DisableAVSetNodes bool
RemoveNotReadyTaint bool
}

func (o *DriverOptions) AddFlags() *flag.FlagSet {
Expand Down Expand Up @@ -100,6 +101,7 @@ func (o *DriverOptions) AddFlags() *flag.FlagSet {
fs.BoolVar(&o.ForceDetachBackoff, "force-detach-backoff", true, "boolean flag to force detach in disk detach backoff")
fs.StringVar(&o.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
fs.BoolVar(&o.DisableAVSetNodes, "disable-avset-nodes", false, "disable DisableAvailabilitySetNodes in cloud config for controller")
fs.BoolVar(&o.RemoveNotReadyTaint, "remove-not-ready-taint", true, "remove NotReady taint from node when node is ready")
fs.StringVar(&o.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")

return fs
Expand Down

0 comments on commit 760761a

Please sign in to comment.