From 9fe0e17aa31bd5a1fd1d5a1abb99aaf8873e02cf Mon Sep 17 00:00:00 2001 From: Omar Pakker Date: Fri, 22 Mar 2024 20:02:52 +0100 Subject: [PATCH 1/2] Support any number of node names in affinity. When we process our PVs, We shouldn't care about the amount of nodes. If none of them exists we can just clear it out and if any one of them does exist, we should skip it. --- pkg/common/common.go | 46 ++++--------- pkg/common/common_test.go | 79 +++++++---------------- pkg/node-cleanup/controller/controller.go | 30 ++++----- pkg/node-cleanup/deleter/deleter.go | 20 ++---- 4 files changed, 55 insertions(+), 120 deletions(-) diff --git a/pkg/common/common.go b/pkg/common/common.go index 04bef2307..1115eb6ed 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -44,7 +44,6 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" - volumeUtil "k8s.io/kubernetes/pkg/volume/util" "k8s.io/utils/mount" ) @@ -495,44 +494,25 @@ func GetVolumeMode(volUtil util.VolumeUtil, fullPath string) (v1.PersistentVolum return "", fmt.Errorf("Block device check for %q failed: %s", fullPath, errblk) } -// NodeExists checks to see if a Node exists in the Indexer of a NodeLister. -// It tries to get the node and if it fails, it uses the well known label -// `kubernetes.io/hostname` to find the Node. -func NodeExists(nodeLister corelisters.NodeLister, nodeName string) (bool, error) { - _, err := nodeLister.Get(nodeName) - if errors.IsNotFound(err) { +// AnyNodeExists checks to see if a Node exists in the Indexer of a NodeLister. +// If this fails, it uses the well known label `kubernetes.io/hostname` to find the Node. +// It aborts early if an unexpected error occurs and it's uncertain if a node would exist or not. +func AnyNodeExists(nodeLister corelisters.NodeLister, nodeNames []string) bool { + for _, nodeName := range nodeNames { + _, err := nodeLister.Get(nodeName) + if err == nil || !errors.IsNotFound(err) { + return true + } req, err := labels.NewRequirement(NodeLabelKey, selection.Equals, []string{nodeName}) if err != nil { - return false, err + return true } nodes, err := nodeLister.List(labels.NewSelector().Add(*req)) - if err != nil { - return false, err + if err != nil || len(nodes) > 0 { + return true } - return len(nodes) > 0, nil } - return err == nil, err -} - -// NodeAttachedToLocalPV gets the name of the Node that a local PV has a NodeAffinity to. -// It assumes that there should be only one matching Node for a local PV and that -// the local PV follows the form: -// -// nodeAffinity: -// required: -// nodeSelectorTerms: -// - matchExpressions: -// - key: kubernetes.io/hostname -// operator: In -// values: -// - -func NodeAttachedToLocalPV(pv *v1.PersistentVolume) (string, bool) { - nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv) - // We assume that there should only be one matching node. - if nodeNames == nil || len(nodeNames) != 1 { - return "", false - } - return nodeNames[0], true + return false } // IsLocalPVWithStorageClass checks that a PV is a local PV that belongs to any of the passed in StorageClasses. diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 14896b867..49756f347 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -477,7 +477,7 @@ func TestGetVolumeMode(t *testing.T) { } } -func TestNodeExists(t *testing.T) { +func TestAnyNodeExists(t *testing.T) { nodeName := "test-node" nodeWithName := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -495,21 +495,39 @@ func TestNodeExists(t *testing.T) { tests := []struct { nodeAdded *v1.Node // Required. - nodeQueried *v1.Node + nodeQueried []string expectedResult bool }{ { nodeAdded: nodeWithName, - nodeQueried: nodeWithName, + nodeQueried: []string{nodeName}, expectedResult: true, }, { nodeAdded: nodeWithLabel, - nodeQueried: nodeWithName, + nodeQueried: []string{nodeName}, expectedResult: true, }, { - nodeQueried: nodeWithName, + nodeQueried: []string{nodeName}, + expectedResult: false, + }, + { + nodeAdded: nodeWithName, + nodeQueried: []string{"other-node", nodeName}, + expectedResult: true, + }, + { + nodeAdded: nodeWithLabel, + nodeQueried: []string{"other-node", nodeName}, + expectedResult: true, + }, + { + nodeQueried: []string{}, + expectedResult: false, + }, + { + nodeQueried: nil, expectedResult: false, }, } @@ -523,62 +541,13 @@ func TestNodeExists(t *testing.T) { nodeInformer.Informer().GetStore().Add(test.nodeAdded) } - exists, err := NodeExists(nodeInformer.Lister(), test.nodeQueried.Name) - if err != nil { - t.Errorf("Got unexpected error: %s", err.Error()) - } + exists := AnyNodeExists(nodeInformer.Lister(), test.nodeQueried) if exists != test.expectedResult { t.Errorf("expected result: %t, actual: %t", test.expectedResult, exists) } } } -func TestNodeAttachedToLocalPV(t *testing.T) { - nodeName := "testNodeName" - - tests := []struct { - name string - pv *v1.PersistentVolume - expectedNodeName string - expectedStatus bool - }{ - { - name: "NodeAffinity will all necessary fields", - pv: withNodeAffinity(pv(), []string{nodeName}, NodeLabelKey), - expectedNodeName: nodeName, - expectedStatus: true, - }, - { - name: "empty nodeNames array", - pv: withNodeAffinity(pv(), []string{}, NodeLabelKey), - expectedNodeName: "", - expectedStatus: false, - }, - { - name: "multiple nodeNames", - pv: withNodeAffinity(pv(), []string{nodeName, "newNode"}, NodeLabelKey), - expectedNodeName: "", - expectedStatus: false, - }, - { - name: "wrong node label key", - pv: withNodeAffinity(pv(), []string{nodeName}, "wrongLabel"), - expectedNodeName: "", - expectedStatus: false, - }, - } - - for _, test := range tests { - nodeName, ok := NodeAttachedToLocalPV(test.pv) - if ok != test.expectedStatus { - t.Errorf("test: %s, status: %t, expectedStaus: %t", test.name, ok, test.expectedStatus) - } - if nodeName != test.expectedNodeName { - t.Errorf("test: %s, nodeName: %s, expectedNodeName: %s", test.name, nodeName, test.expectedNodeName) - } - } -} - func TestIsLocalPVWithStorageClass(t *testing.T) { tests := []struct { name string diff --git a/pkg/node-cleanup/controller/controller.go b/pkg/node-cleanup/controller/controller.go index d838df359..227546552 100644 --- a/pkg/node-cleanup/controller/controller.go +++ b/pkg/node-cleanup/controller/controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + volumeUtil "k8s.io/kubernetes/pkg/volume/util" "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common" cleanupmetrics "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics/node-cleanup" @@ -196,18 +197,15 @@ func (c *CleanupController) syncHandler(ctx context.Context, pvName string) erro return err } - nodeName, ok := common.NodeAttachedToLocalPV(pv) - if !ok { + nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv) + if nodeNames == nil { // For whatever reason the PV isn't formatted properly so we will // never be able to get its corresponding Node, so ignore. klog.Errorf("error getting node attached to pv: %s", pv) return nil } - nodeExists, err := common.NodeExists(c.nodeLister, nodeName) - if err != nil { - return err - } + nodeExists := common.AnyNodeExists(c.nodeLister, nodeNames) // Check that the node the PV/PVC reference is still deleted if nodeExists { return nil @@ -242,7 +240,7 @@ func (c *CleanupController) syncHandler(ctx context.Context, pvName string) erro } cleanupmetrics.PersistentVolumeClaimDeleteTotal.Inc() - klog.Infof("Deleted PVC %q that pointed to Node %q", pvClaimRef.Name, nodeName) + klog.Infof("Deleted PVC %q that pointed to non-existent Nodes %q", pvClaimRef.Name, nodeNames) return nil } @@ -264,18 +262,13 @@ func (c *CleanupController) startCleanupTimersIfNeeded() { continue } - nodeName, ok := common.NodeAttachedToLocalPV(pv) - if !ok { + nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(pv) + if nodeNames == nil { klog.Errorf("error getting node attached to pv: %s", pv) continue } - shouldEnqueue, err := c.shouldEnqueueEntry(pv, nodeName) - if err != nil { - klog.Errorf("error determining whether to enqueue entry with pv %q: %v", pv.Name, err) - continue - } - + shouldEnqueue := c.shouldEnqueueEntry(pv, nodeNames) if shouldEnqueue { klog.Infof("Starting timer for resource deletion, resource:%s, timer duration: %s", pv.Spec.ClaimRef, c.pvcDeletionDelay.String()) c.eventRecorder.Event(pv.Spec.ClaimRef, v1.EventTypeWarning, "ReferencedNodeDeleted", fmt.Sprintf("PVC is tied to a deleted Node. PVC will be cleaned up in %s if the Node doesn't come back", c.pvcDeletionDelay.String())) @@ -288,13 +281,12 @@ func (c *CleanupController) startCleanupTimersIfNeeded() { // shouldEnqueuePV checks if a PV should be enqueued to the entryQueue. // The PV must be a local PV, have a StorageClass present in the list of storageClassNames, have a NodeAffinity // to a deleted Node, and have a PVC bound to it (otherwise there's nothing to clean up). -func (c *CleanupController) shouldEnqueueEntry(pv *v1.PersistentVolume, nodeName string) (bool, error) { +func (c *CleanupController) shouldEnqueueEntry(pv *v1.PersistentVolume, nodeNames []string) bool { if pv.Spec.ClaimRef == nil { - return false, nil + return false } - exists, err := common.NodeExists(c.nodeLister, nodeName) - return !exists && err == nil, err + return !common.AnyNodeExists(c.nodeLister, nodeNames) } // deletePVC deletes the PVC with the given name and namespace diff --git a/pkg/node-cleanup/deleter/deleter.go b/pkg/node-cleanup/deleter/deleter.go index 35ac497b8..0c8ee602d 100644 --- a/pkg/node-cleanup/deleter/deleter.go +++ b/pkg/node-cleanup/deleter/deleter.go @@ -18,7 +18,6 @@ package deleter import ( "context" - "fmt" "time" v1 "k8s.io/api/core/v1" @@ -28,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" + volumeUtil "k8s.io/kubernetes/pkg/volume/util" "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/common" cleanupmetrics "sigs.k8s.io/sig-storage-local-static-provisioner/pkg/metrics/node-cleanup" @@ -82,12 +82,7 @@ func (d *Deleter) DeletePVs(ctx context.Context) { continue } - referencesDeletedNode, err := d.referencesNonExistentNode(pv) - if err != nil { - klog.Errorf("error determining if pv %q references deleted node: %v", pv.Name, err) - continue - } - if !referencesDeletedNode { + if !d.referencesNonExistentNode(pv) { // PV's node is up so PV is not stale continue } @@ -124,14 +119,13 @@ func (d *Deleter) DeletePVs(ctx context.Context) { // operator: In // values: // - -func (d *Deleter) referencesNonExistentNode(localPV *v1.PersistentVolume) (bool, error) { - nodeName, ok := common.NodeAttachedToLocalPV(localPV) - if !ok { - return false, fmt.Errorf("Error retrieving node") +func (d *Deleter) referencesNonExistentNode(localPV *v1.PersistentVolume) bool { + nodeNames := volumeUtil.GetLocalPersistentVolumeNodeNames(localPV) + if nodeNames == nil { + return false } - exists, err := common.NodeExists(d.nodeLister, nodeName) - return !exists && err == nil, err + return !common.AnyNodeExists(d.nodeLister, nodeNames) } func (d *Deleter) deletePV(ctx context.Context, pvName string) error { From 2729447a13792ce2264774ad9450ed13165a6e6b Mon Sep 17 00:00:00 2001 From: Omar Pakker Date: Thu, 28 Mar 2024 19:33:30 +0100 Subject: [PATCH 2/2] Allow additional node selector terms to be set This enables additional node selector terms to be added aside from the provisioner node, enabling use-cases such as shared disks, as affinity can not be changed after provisioning the PV. Limitation in the current implementation is that the provisioner can /not/ be ANDed with additional terms. This change only allows for additional terms that will be /ORed/ with the provisioner name term. --- helm/provisioner/templates/configmap.yaml | 10 ++- pkg/common/common.go | 3 + pkg/common/common_test.go | 40 +++++++++ pkg/discovery/discovery.go | 101 +++++++--------------- pkg/discovery/discovery_test.go | 8 +- 5 files changed, 87 insertions(+), 75 deletions(-) diff --git a/helm/provisioner/templates/configmap.yaml b/helm/provisioner/templates/configmap.yaml index 0a6956e76..2a3c2f2d5 100644 --- a/helm/provisioner/templates/configmap.yaml +++ b/helm/provisioner/templates/configmap.yaml @@ -26,7 +26,7 @@ data: {{- end }} {{- if .Values.useJobForCleaning }} useJobForCleaning: "yes" -{{- end}} +{{- end }} {{- if .Values.tolerations }} jobTolerations: | {{ toYaml .Values.tolerations | nindent 4 }} {{- end }} @@ -35,7 +35,7 @@ data: {{- end }} {{- if .Values.minResyncPeriod }} minResyncPeriod: {{ .Values.minResyncPeriod | quote }} -{{- end}} +{{- end }} storageClassMap: | {{- range $classConfig := .Values.classes }} {{ $classConfig.name }}: @@ -45,7 +45,7 @@ data: blockCleanerCommand: {{- range $val := $classConfig.blockCleanerCommand }} - {{ $val | quote }} - {{- end}} + {{- end }} {{- end }} {{- if $classConfig.volumeMode }} volumeMode: {{ $classConfig.volumeMode }} @@ -56,4 +56,8 @@ data: {{- if $classConfig.namePattern }} namePattern: {{ $classConfig.namePattern | quote }} {{- end }} + {{- if $classConfig.selector }} + selector: + {{- toYaml $classConfig.selector | nindent 8 }} + {{- end }} {{- end }} diff --git a/pkg/common/common.go b/pkg/common/common.go index 1115eb6ed..023882c68 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -140,6 +140,9 @@ type MountConfig struct { // NamePattern name pattern check // only discover file name matching pattern("*" by default) NamePattern string `json:"namePattern" yaml:"namePattern"` + // Additional selector terms to set for node affinity in addition to the provisioner node name. + // Useful for shared disks as affinity can not be changed after provisioning the PV. + Selector []v1.NodeSelectorTerm `json:"selector" yaml:"selector"` } // RuntimeConfig stores all the objects that the provisioner needs to run diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 49756f347..3dee6afcc 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -236,6 +236,46 @@ func TestLoadProvisionerConfigs(t *testing.T) { }, nil, }, + { + map[string]string{"storageClassMap": `local-storage: + hostDir: /mnt/disks + mountDir: /mnt/disks + selector: + - matchExpressions: + - key: "kubernetes.io/hostname" + operator: "In" + values: + - otherNode1 +`, + }, + ProvisionerConfiguration{ + StorageClassConfig: map[string]MountConfig{ + "local-storage": { + HostDir: "/mnt/disks", + MountDir: "/mnt/disks", + BlockCleanerCommand: []string{"/scripts/quick_reset.sh"}, + VolumeMode: "Filesystem", + NamePattern: "*", + Selector: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: v1.NodeSelectorOpIn, + Values: []string{"otherNode1"}, + }, + }, + }, + }, + }, + }, + UseAlphaAPI: true, + MinResyncPeriod: metav1.Duration{ + Duration: time.Hour + time.Minute*30, + }, + }, + nil, + }, } for _, v := range testcases { for name, value := range v.data { diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index ba676f5d8..ba76f13ff 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -23,6 +23,7 @@ import ( "hash/fnv" "net/http" "path/filepath" + "slices" "strings" "sync" "time" @@ -46,11 +47,10 @@ type Discoverer struct { Labels map[string]string // ProcTable is a reference to running processes so that we can prevent PV from being created while // it is being cleaned - CleanupTracker *deleter.CleanupStatusTracker - nodeAffinityAnn string - nodeAffinity *v1.VolumeNodeAffinity - classLister storagev1listers.StorageClassLister - ownerReference *metav1.OwnerReference + CleanupTracker *deleter.CleanupStatusTracker + nodeSelector *v1.NodeSelector + classLister storagev1listers.StorageClassLister + ownerReference *metav1.OwnerReference Readyz *readyzCheck } @@ -106,30 +106,9 @@ func NewDiscoverer(config *common.RuntimeConfig, cleanupTracker *deleter.Cleanup return nil, fmt.Errorf("Failed to generate owner reference: %v", err) } - if config.UseAlphaAPI { - nodeAffinity, err := generateNodeAffinity(config.Node) - if err != nil { - return nil, fmt.Errorf("Failed to generate node affinity: %v", err) - } - tmpAnnotations := map[string]string{} - err = StorageNodeAffinityToAlphaAnnotation(tmpAnnotations, nodeAffinity) - if err != nil { - return nil, fmt.Errorf("Failed to convert node affinity to alpha annotation: %v", err) - } - return &Discoverer{ - RuntimeConfig: config, - Labels: labelMap, - CleanupTracker: cleanupTracker, - classLister: sharedInformer.Lister(), - nodeAffinityAnn: tmpAnnotations[common.AlphaStorageNodeAffinityAnnotation], - ownerReference: ownerRef, - Readyz: &readyzCheck{}, - }, nil - } - - volumeNodeAffinity, err := generateVolumeNodeAffinity(config.Node) + nodeSelector, err := generateNodeSelector(config.Node) if err != nil { - return nil, fmt.Errorf("Failed to generate volume node affinity: %v", err) + return nil, fmt.Errorf("Failed to generate node selector: %v", err) } return &Discoverer{ @@ -137,7 +116,7 @@ func NewDiscoverer(config *common.RuntimeConfig, cleanupTracker *deleter.Cleanup Labels: labelMap, CleanupTracker: cleanupTracker, classLister: sharedInformer.Lister(), - nodeAffinity: volumeNodeAffinity, + nodeSelector: nodeSelector, ownerReference: ownerRef, Readyz: &readyzCheck{}, }, nil @@ -160,7 +139,7 @@ func generateOwnerReference(node *v1.Node) (*metav1.OwnerReference, error) { }, nil } -func generateNodeAffinity(node *v1.Node) (*v1.NodeAffinity, error) { +func generateNodeSelector(node *v1.Node) (*v1.NodeSelector, error) { if node.Labels == nil { return nil, fmt.Errorf("Node does not have labels") } @@ -169,42 +148,14 @@ func generateNodeAffinity(node *v1.Node) (*v1.NodeAffinity, error) { return nil, fmt.Errorf("Node does not have expected label %s", common.NodeLabelKey) } - return &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: common.NodeLabelKey, - Operator: v1.NodeSelectorOpIn, - Values: []string{nodeValue}, - }, - }, - }, - }, - }, - }, nil -} - -func generateVolumeNodeAffinity(node *v1.Node) (*v1.VolumeNodeAffinity, error) { - if node.Labels == nil { - return nil, fmt.Errorf("Node does not have labels") - } - nodeValue, found := node.Labels[common.NodeLabelKey] - if !found { - return nil, fmt.Errorf("Node does not have expected label %s", common.NodeLabelKey) - } - - return &v1.VolumeNodeAffinity{ - Required: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: common.NodeLabelKey, - Operator: v1.NodeSelectorOpIn, - Values: []string{nodeValue}, - }, + return &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: common.NodeLabelKey, + Operator: v1.NodeSelectorOpIn, + Values: []string{nodeValue}, }, }, }, @@ -437,11 +388,25 @@ func (d *Discoverer) createPV(file, class string, reclaimPolicy v1.PersistentVol OwnerReference: d.ownerReference, } + volumeNodeSelector := &v1.NodeSelector{ + NodeSelectorTerms: slices.Concat(d.nodeSelector.NodeSelectorTerms, config.Selector), + } + if d.UseAlphaAPI { + nodeAffinity := &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: volumeNodeSelector, + } + tmpAnnotations := map[string]string{} + err := StorageNodeAffinityToAlphaAnnotation(tmpAnnotations, nodeAffinity) + if err != nil { + return fmt.Errorf("error converting volume affinity to alpha annotation: %v", err) + } localPVConfig.UseAlphaAPI = true - localPVConfig.AffinityAnn = d.nodeAffinityAnn + localPVConfig.AffinityAnn = tmpAnnotations[common.AlphaStorageNodeAffinityAnnotation] } else { - localPVConfig.NodeAffinity = d.nodeAffinity + localPVConfig.NodeAffinity = &v1.VolumeNodeAffinity{ + Required: volumeNodeSelector, + } } if config.FsType != "" { diff --git a/pkg/discovery/discovery_test.go b/pkg/discovery/discovery_test.go index 55fbdf3a6..30e94950e 100644 --- a/pkg/discovery/discovery_test.go +++ b/pkg/discovery/discovery_test.go @@ -753,16 +753,16 @@ func TestUseAlphaAPI(t *testing.T) { if d.UseAlphaAPI { t.Fatal("UseAlphaAPI should be false") } - if len(d.nodeAffinityAnn) != 0 || d.nodeAffinity == nil { - t.Fatal("the value nodeAffinityAnn shouldn't be set while nodeAffinity should") + if d.nodeSelector == nil { + t.Fatal("the value nodeSelector should be set") } d = testSetup(t, test, true, false) if !d.UseAlphaAPI { t.Fatal("UseAlphaAPI should be true") } - if d.nodeAffinity != nil || len(d.nodeAffinityAnn) == 0 { - t.Fatal("the value nodeAffinityAnn should be set while nodeAffinity should not") + if d.nodeSelector == nil { + t.Fatal("the value nodeSelector should be set") } }