Skip to content

Commit

Permalink
Support for NoExecute taint, pod affinity, and bug fix volume attachm…
Browse files Browse the repository at this point in the history
…ent delete (#102)

* Changes for podAffinity, retry of delete volume attachments error, and processing SkipArrayConnectionCheck only on a node noexec taint.

* Add rebalance changes to nway test script; add bounce.kubelet as alternate way to kill node

* Add rebalance.sh which rebalances nodes with scheduler imbalance (some nodes with 110 pods)

* Fix prohibited word in grep describing type of k8s node

* set replicas for unity

* re-enable nodeMonitorHandler

* Add pod topology spread to helm chart and update storage class for unity-nfs
  • Loading branch information
rbo54 authored Jan 26, 2022
1 parent 9b44468 commit 4f57914
Show file tree
Hide file tree
Showing 12 changed files with 728 additions and 75 deletions.
2 changes: 1 addition & 1 deletion internal/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (api *Client) GetCachedVolumeAttachment(ctx context.Context, pvName, nodeNa
vaKey := fmt.Sprintf("%s/%s", *va.Spec.Source.PersistentVolumeName, va.Spec.NodeName)
api.volumeAttachmentCache[vaKey] = vaCopy
api.volumeAttachmentNameToKey[vaCopy.ObjectMeta.Name] = vaKey
log.Infof("Adding VA Cache %s %s", vaCopy.ObjectMeta.Name, vaKey)
log.Debugf("Adding VA Cache %s %s", vaCopy.ObjectMeta.Name, vaKey)
}
}
return api.volumeAttachmentCache[key], nil
Expand Down
194 changes: 159 additions & 35 deletions internal/monitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,31 @@ import (
"sync"
"time"

"podmon/internal/k8sapi"

"github.com/container-storage-interface/spec/lib/go/csi"
csiext "github.com/dell/dell-csi-extensions/podmon"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/watch"
"podmon/internal/k8sapi"
)

// MaxCrashLoopBackOffRetry is the maximum number of times for a pod to be deleted in response to a CrashLoopBackOff
const MaxCrashLoopBackOffRetry = 5

//ControllerPodInfo has information for tracking health of the system
type ControllerPodInfo struct { // information controller keeps on hand about a pod
PodKey string // the Pod Key (namespace/name) of the pod
Node *v1.Node // the associated node structure
PodUID string // the pod container's UID
ArrayIDs []string // string of array IDs used by the pod's volumes
PodKey string // the Pod Key (namespace/name) of the pod
Node *v1.Node // the associated node structure
PodUID string // the pod container's UID
ArrayIDs []string // string of array IDs used by the pod's volumes
PodAffinityLabels map[string]string // A list of pod affinity labels for the pod
}

const notFound = "not found"
const hostNameTopologyKey = "kubernetes.io/hostname"

// controllerModePodHandler handles controller mode functionality when a pod event happens
func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.EventType) error {
log.Debugf("podMonitorHandler-controller: name %s/%s node %s message %s reason %s event %v",
Expand Down Expand Up @@ -106,12 +111,17 @@ func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.
if err != nil {
log.Errorf("Could not determine pod to arrayIDs: %s", err)
}
podAffinityLabels := cm.getPodAffinityLabels(pod)
if len(podAffinityLabels) > 0 {
log.Infof("podKey %s podAffinityLabels %v", podKey, podAffinityLabels)
}
podUID := string(pod.ObjectMeta.UID)
podInfo := &ControllerPodInfo{
PodKey: podKey,
Node: node,
PodUID: podUID,
ArrayIDs: arrayIDs,
PodKey: podKey,
Node: node,
PodUID: podUID,
ArrayIDs: arrayIDs,
PodAffinityLabels: podAffinityLabels,
}
cm.PodKeyToControllerPodInfo.Store(podKey, podInfo)
// Delete (reset) the CrashLoopBackOff counter since we're running.
Expand All @@ -122,7 +132,7 @@ func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.
pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName, initialized, ready, taintnosched, taintnoexec, taintpodmon)
// TODO: option for taintnosched vs. taintnoexec
if (taintnoexec || taintnosched || taintpodmon) && !ready {
go cm.controllerCleanupPod(pod, node, "NodeFailure", taintpodmon)
go cm.controllerCleanupPod(pod, node, "NodeFailure", taintnoexec, taintpodmon)
} else if !ready && crashLoopBackOff {
cnt, _ := cm.PodKeyToCrashLoopBackOffCount.LoadOrStore(podKey, 0)
crashLoopBackOffCount := cnt.(int)
Expand All @@ -144,11 +154,12 @@ func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.
}

// Attempts to cleanup a Pod that is in trouble. Returns true if made it all the way to deleting the pod.
func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reason string, taintpodmon bool) bool {
func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reason string, taintnoexec, taintpodmon bool) bool {
fields := make(map[string]interface{})
fields["namespace"] = pod.ObjectMeta.Namespace
fields["pod"] = pod.ObjectMeta.Name
fields["node"] = node.ObjectMeta.Name
fields["reason"] = reason
// Lock so that only one thread is processing pod at a time
podKey := getPodKey(pod)
// Single thread processing of this pod
Expand Down Expand Up @@ -205,12 +216,17 @@ func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reaso
}

// Call the driver to validate the volumes are not in use
if cm.CSIExtensionsPresent && !cm.SkipArrayConnectionValidation {
if CSIApi.Connected() {
log.WithFields(fields).Infof("Validating host connectivity for node %s volumes %v", node.ObjectMeta.Name, volIDs)
connected, iosInProgress, err := cm.callValidateVolumeHostConnectivity(node, volIDs, true)
// Don't consider connected status if taintpodmon is set, because the node may just have come back online.
if (connected && !taintpodmon) || iosInProgress || err != nil {
if cm.CSIExtensionsPresent && CSIApi.Connected() {
log.WithFields(fields).Infof("Validating host connectivity for node %s volumes %v", node.ObjectMeta.Name, volIDs)
connected, iosInProgress, err := cm.callValidateVolumeHostConnectivity(node, volIDs, true)
// Don't consider connected status if taintpodmon is set, because the node may just have come back online.
if (connected && !taintpodmon) || iosInProgress || err != nil {
fields["connected"] = connected
fields["iosInProgress"] = iosInProgress
// If SkipArrayConnectionValidation and taintnoexec are set, proceed anyway
if cm.SkipArrayConnectionValidation && taintnoexec {
log.WithFields(fields).Info("SkipArrayConnectionValidation is set and taintnoexec is true- proceeding")
} else {
log.WithFields(fields).Info("Aborting pod cleanup because array still connected and/or recently did I/O")
if err = K8sAPI.CreateEvent(podmon, pod, k8sapi.EventTypeWarning, reason,
"podmon aborted pod cleanup %s array connected or recent I/O",
Expand All @@ -221,7 +237,7 @@ func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reaso
}
}
} else {
log.WithFields(fields).Infof("Skipped array connection validation")
log.WithFields(fields).Error("Array validation check skipped because CSIApi not connected")
}

// Fence all the volumes
Expand Down Expand Up @@ -255,8 +271,11 @@ func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reaso
for _, vaName := range vaNamesToDelete {
err = K8sAPI.DeleteVolumeAttachment(ctx, vaName)
if err != nil {
log.WithFields(fields).Errorf("Couldn't delete VolumeAttachment: %s", vaName)
return false
err = K8sAPI.DeleteVolumeAttachment(ctx, vaName)
if err != nil && !strings.Contains(err.Error(), notFound) {
log.WithFields(fields).Errorf("Couldn't delete VolumeAttachment- aborting after retry: %s: %s", vaName, err.Error())
return false
}
}
}

Expand Down Expand Up @@ -354,17 +373,19 @@ func (cm *PodMonitorType) podToArrayIDs(pod *v1.Pod) ([]string, error) {
// If connectivity is lost, will initiate cleanup of the pods.
// This is a never ending function, intended to be called as Go routine.
func (cm *PodMonitorType) ArrayConnectivityMonitor() {

// Loop through all the monitored Pods making sure they still have array access
for {
podKeysToClean := make([]string, 0)
nodesToTaint := make(map[string]bool)

// Clear the connectivity cache so it will sample again.
connectivityCache.ResetSampled()
// Internal function for iterating PodKeyToControllerPodInfo
// This will clean up Pods that have lost connectivity to at least one of their arrays
fnPodKeyToControllerPodInfo := func(key, value interface{}) bool {
controllerPodInfo := value.(*ControllerPodInfo)
podKey := controllerPodInfo.PodKey
podNamespace, podName := splitPodKey(podKey)
podUID := controllerPodInfo.PodUID
node := controllerPodInfo.Node

// Check if we have connectivity for all our array ids
Expand All @@ -377,23 +398,53 @@ func (cm *PodMonitorType) ArrayConnectivityMonitor() {
}
}
if !connected {
// Fetch the pod.
ctx, cancel := K8sAPI.GetContext(MediumTimeout)
defer cancel()
pod, err := K8sAPI.GetPod(ctx, podNamespace, podName)
if err == nil {
if string(pod.ObjectMeta.UID) == podUID && pod.Spec.NodeName == node.ObjectMeta.Name {
log.Infof("Cleaning up pod %s/%s because of array connectivity loss", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
cm.controllerCleanupPod(pod, node, "ArrayConnectionLost", false)
} else {
log.Infof("Skipping pod %s/%s podUID %s %s node %s %s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name,
string(pod.ObjectMeta.UID), podUID, pod.Spec.NodeName, node.ObjectMeta.Name)
}
}
nodesToTaint[node.ObjectMeta.Name] = true
podKeysToClean = append(podKeysToClean, podKey)
}
return true
}

// Process all the pods, generating the associated connectivity cache entries
cm.PodKeyToControllerPodInfo.Range(fnPodKeyToControllerPodInfo)

// Taint all the nodes that were not connected
for nodeName := range nodesToTaint {
log.Infof("Tainting node %s because of connectivity loss", nodeName)
err := taintNode(nodeName, false)
if err != nil {
log.Errorf("Unable to taint node: %s: %s", nodeName, err.Error())
}
}

// Cleanup pods that are on the tainted nodes.
for _, podKey := range podKeysToClean {
// Fetch the pod.
info, ok := cm.PodKeyToControllerPodInfo.Load(podKey)
if !ok {
continue
}
podInfo := info.(*ControllerPodInfo)
if len(podInfo.PodAffinityLabels) > 0 {
// Process all the pods with affinity together
log.Infof("Processing pods with affinity %v", podInfo.PodAffinityLabels)
for _, podKey := range podKeysToClean {
// Fetch the pod.
infox, ok := cm.PodKeyToControllerPodInfo.Load(podKey)
if !ok {
continue
}
podInfox := infox.(*ControllerPodInfo)
if mapEqualsMap(podInfo.PodAffinityLabels, podInfox.PodAffinityLabels) {
cm.ProcessPodInfoForCleanup(podInfox, "ArrayConnectivityLoss")
}
}
log.Infof("End Processing pods with affinity %v", podInfo.PodAffinityLabels)
} else {
cm.ProcessPodInfoForCleanup(podInfo, "ArrayConnectivityLoss")
}
}

// Sleep according to the NODE_CONNECTIVITY_POLL_RATE
pollRate := GetArrayConnectivityPollRate()
time.Sleep(pollRate)
if pollRate < 10*time.Millisecond {
Expand All @@ -403,6 +454,23 @@ func (cm *PodMonitorType) ArrayConnectivityMonitor() {
}
}

// ProcessPodInfoForCleanup processes a ControllerPodInfo for cleanup, checking that the UID and object are the same, and then calling controllerCleanupPod.
func (cm *PodMonitorType) ProcessPodInfoForCleanup(podInfo *ControllerPodInfo, reason string) {
podNamespace, podName := splitPodKey(podInfo.PodKey)
ctx, cancel := K8sAPI.GetContext(MediumTimeout)
defer cancel()
pod, err := K8sAPI.GetPod(ctx, podNamespace, podName)
if err == nil {
if string(pod.ObjectMeta.UID) == podInfo.PodUID && pod.Spec.NodeName == podInfo.Node.ObjectMeta.Name {
log.Infof("Cleaning up pod %s/%s because of %s", reason, pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
cm.controllerCleanupPod(pod, podInfo.Node, reason, false, false)
} else {
log.Infof("Skipping pod %s/%s podUID %s %s node %s %s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name,
string(pod.ObjectMeta.UID), podInfo.PodUID, pod.Spec.NodeName, podInfo.Node.ObjectMeta.Name)
}
}
}

type nodeArrayConnectivityCache struct {
initOnce sync.Once // Will be set after initialization
nodeArrayConnectivitySampled map[string]bool // If true, already sampled, if need to call array to verify connectivity
Expand Down Expand Up @@ -500,3 +568,59 @@ func nodeHasTaint(node *v1.Node, key string, taintEffect v1.TaintEffect) bool {
}
return false
}

// getPodAffinityLabels returns nil if no node affinity is specified. If node affinity is specified,
// podPodAffinity returns a map of podLabels for pods the specificed pod should have affinity with.
func (cm *PodMonitorType) getPodAffinityLabels(pod *v1.Pod) map[string]string {
result := make(map[string]string)
affinity := pod.Spec.Affinity
if affinity == nil {
return result
}
podAffinity := affinity.PodAffinity
if podAffinity == nil {
return result
}
requiredDuringSchedulingIgnoredDuringExecution := podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
if requiredDuringSchedulingIgnoredDuringExecution == nil {
return result
}
for _, schedConstraints := range requiredDuringSchedulingIgnoredDuringExecution {
topologyKey := schedConstraints.TopologyKey
if topologyKey != hostNameTopologyKey {
continue
}
labelSelector := schedConstraints.LabelSelector
if labelSelector == nil {
continue
}
matchLabels := labelSelector.MatchLabels
for k, v := range matchLabels {
result[k] = v
}
for _, matchExpr := range labelSelector.MatchExpressions {
if matchExpr.Operator != "In" {
continue
}
for _, v := range matchExpr.Values {
result[matchExpr.Key] = v
}
}

}
return result
}

// mapEqualsMap returns true IFF string map1 contains the same elements as map2
func mapEqualsMap(map1, map2 map[string]string) bool {
if len(map1) != len(map2) {
return false
}
for k1, v1 := range map1 {
v2, ok := map2[k1]
if !ok || v2 != v1 {
return false
}
}
return true
}
Loading

0 comments on commit 4f57914

Please sign in to comment.