Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for NoExecute taint, pod affinity, and bug fix volume attachment delete #102

Merged
merged 8 commits into from
Jan 26, 2022
2 changes: 1 addition & 1 deletion internal/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (api *Client) GetCachedVolumeAttachment(ctx context.Context, pvName, nodeNa
vaKey := fmt.Sprintf("%s/%s", *va.Spec.Source.PersistentVolumeName, va.Spec.NodeName)
api.volumeAttachmentCache[vaKey] = vaCopy
api.volumeAttachmentNameToKey[vaCopy.ObjectMeta.Name] = vaKey
log.Infof("Adding VA Cache %s %s", vaCopy.ObjectMeta.Name, vaKey)
log.Debugf("Adding VA Cache %s %s", vaCopy.ObjectMeta.Name, vaKey)
}
}
return api.volumeAttachmentCache[key], nil
Expand Down
194 changes: 159 additions & 35 deletions internal/monitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,31 @@ import (
"sync"
"time"

"podmon/internal/k8sapi"

"github.com/container-storage-interface/spec/lib/go/csi"
csiext "github.com/dell/dell-csi-extensions/podmon"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/watch"
"podmon/internal/k8sapi"
)

// MaxCrashLoopBackOffRetry is the maximum number of times for a pod to be deleted in response to a CrashLoopBackOff
const MaxCrashLoopBackOffRetry = 5

//ControllerPodInfo has information for tracking health of the system
type ControllerPodInfo struct { // information controller keeps on hand about a pod
PodKey string // the Pod Key (namespace/name) of the pod
Node *v1.Node // the associated node structure
PodUID string // the pod container's UID
ArrayIDs []string // string of array IDs used by the pod's volumes
PodKey string // the Pod Key (namespace/name) of the pod
Node *v1.Node // the associated node structure
PodUID string // the pod container's UID
ArrayIDs []string // string of array IDs used by the pod's volumes
PodAffinityLabels map[string]string // A list of pod affinity labels for the pod
}

const notFound = "not found"
const hostNameTopologyKey = "kubernetes.io/hostname"

// controllerModePodHandler handles controller mode functionality when a pod event happens
func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.EventType) error {
log.Debugf("podMonitorHandler-controller: name %s/%s node %s message %s reason %s event %v",
Expand Down Expand Up @@ -106,12 +111,17 @@ func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.
if err != nil {
log.Errorf("Could not determine pod to arrayIDs: %s", err)
}
podAffinityLabels := cm.getPodAffinityLabels(pod)
if len(podAffinityLabels) > 0 {
log.Infof("podKey %s podAffinityLabels %v", podKey, podAffinityLabels)
}
podUID := string(pod.ObjectMeta.UID)
podInfo := &ControllerPodInfo{
PodKey: podKey,
Node: node,
PodUID: podUID,
ArrayIDs: arrayIDs,
PodKey: podKey,
Node: node,
PodUID: podUID,
ArrayIDs: arrayIDs,
PodAffinityLabels: podAffinityLabels,
}
cm.PodKeyToControllerPodInfo.Store(podKey, podInfo)
// Delete (reset) the CrashLoopBackOff counter since we're running.
Expand All @@ -122,7 +132,7 @@ func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.
pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName, initialized, ready, taintnosched, taintnoexec, taintpodmon)
// TODO: option for taintnosched vs. taintnoexec
if (taintnoexec || taintnosched || taintpodmon) && !ready {
go cm.controllerCleanupPod(pod, node, "NodeFailure", taintpodmon)
go cm.controllerCleanupPod(pod, node, "NodeFailure", taintnoexec, taintpodmon)
} else if !ready && crashLoopBackOff {
cnt, _ := cm.PodKeyToCrashLoopBackOffCount.LoadOrStore(podKey, 0)
crashLoopBackOffCount := cnt.(int)
Expand All @@ -144,11 +154,12 @@ func (cm *PodMonitorType) controllerModePodHandler(pod *v1.Pod, eventType watch.
}

// Attempts to cleanup a Pod that is in trouble. Returns true if made it all the way to deleting the pod.
func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reason string, taintpodmon bool) bool {
func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reason string, taintnoexec, taintpodmon bool) bool {
fields := make(map[string]interface{})
fields["namespace"] = pod.ObjectMeta.Namespace
fields["pod"] = pod.ObjectMeta.Name
fields["node"] = node.ObjectMeta.Name
fields["reason"] = reason
// Lock so that only one thread is processing pod at a time
podKey := getPodKey(pod)
// Single thread processing of this pod
Expand Down Expand Up @@ -205,12 +216,17 @@ func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reaso
}

// Call the driver to validate the volumes are not in use
if cm.CSIExtensionsPresent && !cm.SkipArrayConnectionValidation {
if CSIApi.Connected() {
log.WithFields(fields).Infof("Validating host connectivity for node %s volumes %v", node.ObjectMeta.Name, volIDs)
connected, iosInProgress, err := cm.callValidateVolumeHostConnectivity(node, volIDs, true)
// Don't consider connected status if taintpodmon is set, because the node may just have come back online.
if (connected && !taintpodmon) || iosInProgress || err != nil {
if cm.CSIExtensionsPresent && CSIApi.Connected() {
log.WithFields(fields).Infof("Validating host connectivity for node %s volumes %v", node.ObjectMeta.Name, volIDs)
connected, iosInProgress, err := cm.callValidateVolumeHostConnectivity(node, volIDs, true)
// Don't consider connected status if taintpodmon is set, because the node may just have come back online.
if (connected && !taintpodmon) || iosInProgress || err != nil {
fields["connected"] = connected
fields["iosInProgress"] = iosInProgress
// If SkipArrayConnectionValidation and taintnoexec are set, proceed anyway
if cm.SkipArrayConnectionValidation && taintnoexec {
log.WithFields(fields).Info("SkipArrayConnectionValidation is set and taintnoexec is true- proceeding")
} else {
log.WithFields(fields).Info("Aborting pod cleanup because array still connected and/or recently did I/O")
if err = K8sAPI.CreateEvent(podmon, pod, k8sapi.EventTypeWarning, reason,
"podmon aborted pod cleanup %s array connected or recent I/O",
Expand All @@ -221,7 +237,7 @@ func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reaso
}
}
} else {
log.WithFields(fields).Infof("Skipped array connection validation")
log.WithFields(fields).Error("Array validation check skipped because CSIApi not connected")
}

// Fence all the volumes
Expand Down Expand Up @@ -255,8 +271,11 @@ func (cm *PodMonitorType) controllerCleanupPod(pod *v1.Pod, node *v1.Node, reaso
for _, vaName := range vaNamesToDelete {
err = K8sAPI.DeleteVolumeAttachment(ctx, vaName)
if err != nil {
log.WithFields(fields).Errorf("Couldn't delete VolumeAttachment: %s", vaName)
return false
err = K8sAPI.DeleteVolumeAttachment(ctx, vaName)
if err != nil && !strings.Contains(err.Error(), notFound) {
log.WithFields(fields).Errorf("Couldn't delete VolumeAttachment- aborting after retry: %s: %s", vaName, err.Error())
return false
}
}
}

Expand Down Expand Up @@ -354,17 +373,19 @@ func (cm *PodMonitorType) podToArrayIDs(pod *v1.Pod) ([]string, error) {
// If connectivity is lost, will initiate cleanup of the pods.
// This is a never ending function, intended to be called as Go routine.
func (cm *PodMonitorType) ArrayConnectivityMonitor() {

// Loop through all the monitored Pods making sure they still have array access
for {
podKeysToClean := make([]string, 0)
nodesToTaint := make(map[string]bool)

// Clear the connectivity cache so it will sample again.
connectivityCache.ResetSampled()
// Internal function for iterating PodKeyToControllerPodInfo
// This will clean up Pods that have lost connectivity to at least one of their arrays
fnPodKeyToControllerPodInfo := func(key, value interface{}) bool {
controllerPodInfo := value.(*ControllerPodInfo)
podKey := controllerPodInfo.PodKey
podNamespace, podName := splitPodKey(podKey)
podUID := controllerPodInfo.PodUID
node := controllerPodInfo.Node

// Check if we have connectivity for all our array ids
Expand All @@ -377,23 +398,53 @@ func (cm *PodMonitorType) ArrayConnectivityMonitor() {
}
}
if !connected {
// Fetch the pod.
ctx, cancel := K8sAPI.GetContext(MediumTimeout)
defer cancel()
pod, err := K8sAPI.GetPod(ctx, podNamespace, podName)
if err == nil {
if string(pod.ObjectMeta.UID) == podUID && pod.Spec.NodeName == node.ObjectMeta.Name {
log.Infof("Cleaning up pod %s/%s because of array connectivity loss", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
cm.controllerCleanupPod(pod, node, "ArrayConnectionLost", false)
} else {
log.Infof("Skipping pod %s/%s podUID %s %s node %s %s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name,
string(pod.ObjectMeta.UID), podUID, pod.Spec.NodeName, node.ObjectMeta.Name)
}
}
nodesToTaint[node.ObjectMeta.Name] = true
podKeysToClean = append(podKeysToClean, podKey)
}
return true
}

// Process all the pods, generating the associated connectivity cache entries
cm.PodKeyToControllerPodInfo.Range(fnPodKeyToControllerPodInfo)

// Taint all the nodes that were not connected
for nodeName := range nodesToTaint {
log.Infof("Tainting node %s because of connectivity loss", nodeName)
err := taintNode(nodeName, false)
if err != nil {
log.Errorf("Unable to taint node: %s: %s", nodeName, err.Error())
}
}

// Cleanup pods that are on the tainted nodes.
for _, podKey := range podKeysToClean {
// Fetch the pod.
info, ok := cm.PodKeyToControllerPodInfo.Load(podKey)
if !ok {
continue
}
podInfo := info.(*ControllerPodInfo)
if len(podInfo.PodAffinityLabels) > 0 {
// Process all the pods with affinity together
log.Infof("Processing pods with affinity %v", podInfo.PodAffinityLabels)
for _, podKey := range podKeysToClean {
// Fetch the pod.
infox, ok := cm.PodKeyToControllerPodInfo.Load(podKey)
if !ok {
continue
}
podInfox := infox.(*ControllerPodInfo)
if mapEqualsMap(podInfo.PodAffinityLabels, podInfox.PodAffinityLabels) {
cm.ProcessPodInfoForCleanup(podInfox, "ArrayConnectivityLoss")
}
}
log.Infof("End Processing pods with affinity %v", podInfo.PodAffinityLabels)
} else {
cm.ProcessPodInfoForCleanup(podInfo, "ArrayConnectivityLoss")
}
}

// Sleep according to the NODE_CONNECTIVITY_POLL_RATE
pollRate := GetArrayConnectivityPollRate()
time.Sleep(pollRate)
if pollRate < 10*time.Millisecond {
Expand All @@ -403,6 +454,23 @@ func (cm *PodMonitorType) ArrayConnectivityMonitor() {
}
}

// ProcessPodInfoForCleanup processes a ControllerPodInfo for cleanup, checking that the UID and object are the same, and then calling controllerCleanupPod.
func (cm *PodMonitorType) ProcessPodInfoForCleanup(podInfo *ControllerPodInfo, reason string) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This func is not required to expose!

podNamespace, podName := splitPodKey(podInfo.PodKey)
ctx, cancel := K8sAPI.GetContext(MediumTimeout)
defer cancel()
pod, err := K8sAPI.GetPod(ctx, podNamespace, podName)
if err == nil {
if string(pod.ObjectMeta.UID) == podInfo.PodUID && pod.Spec.NodeName == podInfo.Node.ObjectMeta.Name {
log.Infof("Cleaning up pod %s/%s because of %s", reason, pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
cm.controllerCleanupPod(pod, podInfo.Node, reason, false, false)
} else {
log.Infof("Skipping pod %s/%s podUID %s %s node %s %s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name,
string(pod.ObjectMeta.UID), podInfo.PodUID, pod.Spec.NodeName, podInfo.Node.ObjectMeta.Name)
}
}
}

type nodeArrayConnectivityCache struct {
initOnce sync.Once // Will be set after initialization
nodeArrayConnectivitySampled map[string]bool // If true, already sampled, if need to call array to verify connectivity
Expand Down Expand Up @@ -500,3 +568,59 @@ func nodeHasTaint(node *v1.Node, key string, taintEffect v1.TaintEffect) bool {
}
return false
}

// getPodAffinityLabels returns nil if no node affinity is specified. If node affinity is specified,
// podPodAffinity returns a map of podLabels for pods the specificed pod should have affinity with.
func (cm *PodMonitorType) getPodAffinityLabels(pod *v1.Pod) map[string]string {
result := make(map[string]string)
affinity := pod.Spec.Affinity
if affinity == nil {
return result
}
podAffinity := affinity.PodAffinity
if podAffinity == nil {
return result
}
requiredDuringSchedulingIgnoredDuringExecution := podAffinity.RequiredDuringSchedulingIgnoredDuringExecution
if requiredDuringSchedulingIgnoredDuringExecution == nil {
return result
}
for _, schedConstraints := range requiredDuringSchedulingIgnoredDuringExecution {
topologyKey := schedConstraints.TopologyKey
if topologyKey != hostNameTopologyKey {
continue
}
labelSelector := schedConstraints.LabelSelector
if labelSelector == nil {
continue
}
matchLabels := labelSelector.MatchLabels
for k, v := range matchLabels {
result[k] = v
}
for _, matchExpr := range labelSelector.MatchExpressions {
if matchExpr.Operator != "In" {
continue
}
for _, v := range matchExpr.Values {
result[matchExpr.Key] = v
}
}

}
return result
}

// mapEqualsMap returns true IFF string map1 contains the same elements as map2
func mapEqualsMap(map1, map2 map[string]string) bool {
if len(map1) != len(map2) {
return false
}
for k1, v1 := range map1 {
v2, ok := map2[k1]
if !ok || v2 != v1 {
return false
}
}
return true
}
Loading