Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pods check needs removal #1954

Merged
27 changes: 25 additions & 2 deletions controllers/update_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/FoundationDB/fdb-kubernetes-operator/pkg/fdbstatus"

"github.com/FoundationDB/fdb-kubernetes-operator/internal/replacements"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
"github.com/FoundationDB/fdb-kubernetes-operator/internal"
"github.com/go-logr/logr"
Expand All @@ -39,7 +41,14 @@ type updatePods struct{}

// reconcile runs the reconciler's work.
func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, status *fdbv1beta2.FoundationDBStatus, logger logr.Logger) *requeue {
updates, err := getPodsToUpdate(ctx, logger, r, cluster)
// TODO(johscheuer): Remove the pvc map an make direct calls.
pvcs := &corev1.PersistentVolumeClaimList{}
err := r.List(ctx, pvcs, internal.GetPodListOptions(cluster, "", "")...)
if err != nil {
return &requeue{curError: err}
}

updates, err := getPodsToUpdate(ctx, logger, r, cluster, internal.CreatePVCMap(cluster, pvcs))
if err != nil {
return &requeue{curError: err, delay: podSchedulingDelayDuration, delayedRequeue: true}
}
Expand Down Expand Up @@ -117,7 +126,7 @@ func getFaultDomainsWithUnavailablePods(ctx context.Context, logger logr.Logger,
}

// getPodsToUpdate returns a map of Zone to Pods mapping. The map has the fault domain as key and all Pods in that fault domain will be present as a slice of *corev1.Pod.
func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster) (map[string][]*corev1.Pod, error) {
func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, pvcMap map[fdbv1beta2.ProcessGroupID]corev1.PersistentVolumeClaim) (map[string][]*corev1.Pod, error) {
updates := make(map[string][]*corev1.Pod)

faultDomainsWithUnavailablePods := getFaultDomainsWithUnavailablePods(ctx, logger, reconciler, cluster)
Expand Down Expand Up @@ -188,6 +197,20 @@ func getPodsToUpdate(ctx context.Context, logger logr.Logger, reconciler *Founda
continue
}

needsRemoval, err := replacements.ProcessGroupNeedsRemoval(ctx, reconciler.PodLifecycleManager, reconciler, logger, cluster, processGroup, pvcMap)
// Do not update the Pod if unable to determine if it needs to be removed.
if err != nil {
logger.V(1).Info("Skip process group, error checking if it requires a removal",
"processGroupID", processGroup.ProcessGroupID,
"error", err.Error())
continue
}
if needsRemoval {
logger.V(1).Info("Skip process group for deletion, requires a removal",
"processGroupID", processGroup.ProcessGroupID)
continue
}

logger.Info("Update Pod",
"processGroupID", processGroup.ProcessGroupID,
"reason", fmt.Sprintf("specHash has changed from %s to %s", specHash, pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey]))
Expand Down
30 changes: 25 additions & 5 deletions controllers/update_pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ var _ = Describe("update_pods", func() {
When("fetching all Pods that needs an update", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var updates map[string][]*corev1.Pod
var pvcMap map[fdbv1beta2.ProcessGroupID]corev1.PersistentVolumeClaim
var expectedError bool
var err error

Expand All @@ -306,10 +307,16 @@ var _ = Describe("update_pods", func() {
Expect(err).NotTo(HaveOccurred())
Expect(result.Requeue).To(BeFalse())
Expect(k8sClient.Get(context.TODO(), ctrlClient.ObjectKeyFromObject(cluster), cluster)).NotTo(HaveOccurred())

allPvcs := &corev1.PersistentVolumeClaimList{}
err = clusterReconciler.List(context.TODO(), allPvcs, internal.GetPodListOptions(cluster, "", "")...)
Expect(err).NotTo(HaveOccurred())

pvcMap = internal.CreatePVCMap(cluster, allPvcs)
})

JustBeforeEach(func() {
updates, err = getPodsToUpdate(context.Background(), globalControllerLogger, clusterReconciler, cluster)
updates, err = getPodsToUpdate(context.Background(), globalControllerLogger, clusterReconciler, cluster, pvcMap)
if !expectedError {
Expect(err).NotTo(HaveOccurred())
} else {
Expand All @@ -326,7 +333,7 @@ var _ = Describe("update_pods", func() {
When("there is a spec change for all processes", func() {
BeforeEach(func() {
storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral]
storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"}
storageSettings.PodTemplate.Spec.Tolerations = []corev1.Toleration{{Key: "test", Operator: "Exists", Effect: "NoSchedule"}}
cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings
Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred())
})
Expand All @@ -336,14 +343,27 @@ var _ = Describe("update_pods", func() {
Expect(updates).To(HaveLen(1))
})
})
When("there is a spec change requiring a removal", func() {
BeforeEach(func() {
storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral]
// Updates to NodeSelector requires a removal
storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"}
cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings
Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred())
})

It("should return no updates", func() {
Expect(updates).To(HaveLen(0))
})
})

When("max zones with unavailable pods is set to 3 and there are two process groups with pods in pending status", func() {
BeforeEach(func() {
expectedError = false
cluster.Spec.MaxZonesWithUnavailablePods = pointer.Int(3)
// Update all processes
storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral]
storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"}
storageSettings.PodTemplate.Spec.Tolerations = []corev1.Toleration{{Key: "test", Operator: "Exists", Effect: "NoSchedule"}}
cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings
Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred())

Expand All @@ -366,7 +386,7 @@ var _ = Describe("update_pods", func() {
cluster.Spec.MaxZonesWithUnavailablePods = pointer.Int(2)
// Update all processes
storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral]
storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"}
storageSettings.PodTemplate.Spec.Tolerations = []corev1.Toleration{{Key: "test", Operator: "Exists", Effect: "NoSchedule"}}
cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings
Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred())

Expand All @@ -389,7 +409,7 @@ var _ = Describe("update_pods", func() {
cluster.Spec.MaxZonesWithUnavailablePods = pointer.Int(1)
// Update all processes
storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral]
storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"}
storageSettings.PodTemplate.Spec.Tolerations = []corev1.Toleration{{Key: "test", Operator: "Exists", Effect: "NoSchedule"}}
cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings
Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred())

Expand Down
64 changes: 34 additions & 30 deletions internal/replacements/replacements.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,11 @@ func ReplaceMisconfiguredProcessGroups(ctx context.Context, podManager podmanage
continue
}

// TODO(johscheuer): Fix how we fetch the pvc to make better use of the controller runtime cache.
pvc, hasPVC := pvcMap[processGroup.ProcessGroupID]
pod, podErr := podManager.GetPod(ctx, client, cluster, processGroup.GetPodName(cluster))
if hasPVC {
needsPVCRemoval, err := processGroupNeedsRemovalForPVC(cluster, pvc, log, processGroup)
if err != nil {
return hasReplacements, err
}

if needsPVCRemoval && podErr == nil {
processGroup.MarkForRemoval()
hasReplacements = true
maxReplacements--
continue
}
} else if processGroup.ProcessClass.IsStateful() {
log.V(1).Info("Could not find PVC for process group ID",
"processGroupID", processGroup.ProcessGroupID)
}
needsRemoval, err := ProcessGroupNeedsRemoval(ctx, podManager, client, log, cluster, processGroup, pvcMap)

if podErr != nil {
log.V(1).Info("Could not find Pod for process group ID",
"processGroupID", processGroup.ProcessGroupID)
continue
}

needsRemoval, err := processGroupNeedsRemoval(cluster, pod, processGroup, log)
// Do not mark for removal if there is an error
if err != nil {
return hasReplacements, err
continue
}

if needsRemoval {
Expand All @@ -94,9 +70,37 @@ func ReplaceMisconfiguredProcessGroups(ctx context.Context, podManager podmanage
return hasReplacements, nil
}

// ProcessGroupNeedsRemoval checks if a process group needs to be removed.
func ProcessGroupNeedsRemoval(ctx context.Context, podManager podmanager.PodLifecycleManager, client client.Client, log logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, processGroup *fdbv1beta2.ProcessGroupStatus, pvcMap map[fdbv1beta2.ProcessGroupID]corev1.PersistentVolumeClaim) (bool, error) {
// TODO(johscheuer): Fix how we fetch the pvc to make better use of the controller runtime cache.
pvc, hasPVC := pvcMap[processGroup.ProcessGroupID]
pod, podErr := podManager.GetPod(ctx, client, cluster, processGroup.GetPodName(cluster))
if hasPVC {
needsPVCRemoval, err := processGroupNeedsRemovalForPVC(cluster, pvc, log, processGroup)
if err != nil {
return false, err
}

if needsPVCRemoval && podErr == nil {
return true, nil
}
} else if processGroup.ProcessClass.IsStateful() {
log.V(1).Info("Could not find PVC for process group ID",
"processGroupID", processGroup.ProcessGroupID)
}

if podErr != nil {
log.V(1).Info("Could not find Pod for process group ID",
"processGroupID", processGroup.ProcessGroupID)
return false, podErr
}

return processGroupNeedsRemovalForPod(cluster, pod, processGroup, log)
}

func processGroupNeedsRemovalForPVC(cluster *fdbv1beta2.FoundationDBCluster, pvc corev1.PersistentVolumeClaim, log logr.Logger, processGroup *fdbv1beta2.ProcessGroupStatus) (bool, error) {
processGroupID := internal.GetProcessGroupIDFromMeta(cluster, pvc.ObjectMeta)
logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "pvc", pvc.Name, "processGroupID", processGroupID, "reconciler", "replaceMisconfiguredProcessGroups")
logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "pvc", pvc.Name, "processGroupID", processGroupID)
johscheuer marked this conversation as resolved.
Show resolved Hide resolved

ownedByCluster := !cluster.ShouldFilterOnOwnerReferences()
if !ownedByCluster {
Expand Down Expand Up @@ -135,12 +139,12 @@ func processGroupNeedsRemovalForPVC(cluster *fdbv1beta2.FoundationDBCluster, pvc
return false, nil
}

func processGroupNeedsRemoval(cluster *fdbv1beta2.FoundationDBCluster, pod *corev1.Pod, processGroupStatus *fdbv1beta2.ProcessGroupStatus, log logr.Logger) (bool, error) {
func processGroupNeedsRemovalForPod(cluster *fdbv1beta2.FoundationDBCluster, pod *corev1.Pod, processGroupStatus *fdbv1beta2.ProcessGroupStatus, log logr.Logger) (bool, error) {
if pod == nil {
return false, nil
}

logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "processGroupID", processGroupStatus.ProcessGroupID, "reconciler", "replaceMisconfiguredProcessGroups")
logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "processGroupID", processGroupStatus.ProcessGroupID)

if processGroupStatus.IsMarkedForRemoval() {
return false, nil
Expand Down
Loading
Loading