Skip to content

Commit

Permalink
fix: concurrent apply / status checks -> LVMCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jakobmoellerdev committed Aug 21, 2023
1 parent fe456ff commit 4aca845
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 11 deletions.
25 changes: 25 additions & 0 deletions controllers/internal/multierror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package internal

import (
"strings"
)

// NewMultiErrorWithNewLineSeparator creates a MultiError that uses "\n" as separator for each error.
func NewMultiErrorWithNewLineSeparator(errs []error) error {
return &MultiError{Errors: errs, Separator: "\n"}
}

// MultiError is an error that aggregates multiple errors together and uses
// a separator to aggregate them when called with Error.
type MultiError struct {
Errors []error
Separator string
}

func (m *MultiError) Error() string {
errs := make([]string, len(m.Errors))
for i, err := range m.Errors {
errs[i] = err.Error()
}
return strings.Join(errs, m.Separator)
}
37 changes: 27 additions & 10 deletions controllers/lvmcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/go-logr/logr"
configv1 "github.com/openshift/api/config/v1"
secv1client "github.com/openshift/client-go/security/clientset/versioned/typed/security/v1"

lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1"
"github.com/openshift/lvm-operator/controllers/internal"

topolvmv1 "github.com/topolvm/topolvm/api/v1"

Expand Down Expand Up @@ -165,7 +165,7 @@ func (r *LVMClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// errors returned by this will be updated in the reconcileSucceeded condition of the LVMCluster
func (r *LVMClusterReconciler) reconcile(ctx context.Context, instance *lvmv1alpha1.LVMCluster) (ctrl.Result, error) {

//The resource was deleted
// The resource was deleted
if !instance.DeletionTimestamp.IsZero() {
// Check for existing LogicalVolumes
lvsExist, err := r.logicalVolumesExist(ctx, instance)
Expand Down Expand Up @@ -197,7 +197,7 @@ func (r *LVMClusterReconciler) reconcile(ctx context.Context, instance *lvmv1alp
r.Log.Info("successfully added finalizer")
}

resourceCreationList := []resourceManager{
resources := []resourceManager{
&csiDriver{},
&topolvmController{r.TopoLVMLeaderElectionPassthrough},
&openshiftSccs{},
Expand All @@ -208,16 +208,33 @@ func (r *LVMClusterReconciler) reconcile(ctx context.Context, instance *lvmv1alp
&topolvmVolumeSnapshotClass{},
}

// handle create/update
for _, unit := range resourceCreationList {
err := unit.ensureCreated(r, ctx, instance)
if err != nil {
r.Log.Error(err, "failed to reconcile", "resource", unit.getName())
return ctrl.Result{}, err
resourceSyncStart := time.Now()
results := make(chan error, len(resources))
create := func(i int) {
if err := resources[i].ensureCreated(r, ctx, instance); err != nil {
results <- err
}
}

for i := range resources {
go create(i)
}

var errs []error
for i := 0; i < len(resources); i++ {
if err := <-results; err != nil {
errs = append(errs, err)
}
}

r.Log.Info("successfully reconciled LVMCluster")
resourceSyncElapsedTime := time.Since(resourceSyncStart)
if len(errs) > 0 {
err := internal.NewMultiErrorWithNewLineSeparator(errs)
r.Log.Error(err, "failed to reconcile resources managed by LVMCluster", "resourceSyncElapsedTime", resourceSyncElapsedTime)
return ctrl.Result{}, err
}

r.Log.Info("successfully reconciled LVMCluster", "resourceSyncElapsedTime", resourceSyncElapsedTime)

return ctrl.Result{}, nil
}
Expand Down
7 changes: 6 additions & 1 deletion controllers/topolvm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package controllers
import (
"context"
"fmt"
v1 "github.com/openshift/api/config/v1"
"path/filepath"

v1 "github.com/openshift/api/config/v1"
lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -77,6 +77,11 @@ func (c topolvmController) ensureCreated(r *LVMClusterReconciler, ctx context.Co
return err
}

if err := verifyDeploymentReadiness(existingDeployment); err != nil {
r.Log.Error(err, "csi controller is not considered ready", "deployment", existingDeployment.Name)
return err
}

r.Log.Info("csi controller", "operation", result, "name", desiredDeployment.Name)
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions controllers/topolvm_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func (n topolvmNode) ensureCreated(r *LVMClusterReconciler, ctx context.Context,
} else {
r.Log.Info(topolvmNodeName, "operation", result, "name", ds.Name)
}

if err := verifyDaemonSetReadiness(ds); err != nil {
r.Log.Error(err, "DaemonSet is not considered ready", "DaemonSet", topolvmNodeName)
return err
}

return err
}

Expand Down
47 changes: 47 additions & 0 deletions controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package controllers

import (
"fmt"

lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// extractNodeSelectorAndTolerations combines and extracts scheduling parameters from the multiple deviceClass entries in an lvmCluster
Expand Down Expand Up @@ -64,3 +67,47 @@ func getStorageClassName(deviceName string) string {
func getVolumeSnapshotClassName(deviceName string) string {
return volumeSnapshotClassPrefix + deviceName
}

func verifyDaemonSetReadiness(ds *appsv1.DaemonSet) error {
// If the update strategy is not a rolling update, there will be nothing to wait for
if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
return nil
}

// Make sure all the updated pods have been scheduled
if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
return fmt.Errorf("the DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
}
maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
if err != nil {
// If for some reason the value is invalid, set max unavailable to the
// number of desired replicas. This is the same behavior as the
// `MaxUnavailable` function in deploymentutil
maxUnavailable = int(ds.Status.DesiredNumberScheduled)
}

expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
if !(int(ds.Status.NumberReady) >= expectedReady) {
return fmt.Errorf("the DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
}
return nil
}

func verifyDeploymentReadiness(dep *appsv1.Deployment) error {
if len(dep.Status.Conditions) == 0 {
return fmt.Errorf("the Deployment is not ready: %s/%s. deployment cannot be ready as no condition was found", dep.Namespace, dep.Name)
}
for _, condition := range dep.Status.Conditions {
if condition.Type == appsv1.DeploymentAvailable && condition.Status == corev1.ConditionFalse {
return fmt.Errorf("the Deployment is not ready: %s/%s. deployment has not reached minimum availability and thus not ready: %v",
dep.Namespace, dep.Name, condition)
} else if condition.Type == appsv1.DeploymentProgressing && condition.Status == corev1.ConditionFalse {
return fmt.Errorf("the Deployment is not ready: %s/%s. deployment has not progressed and is thus not ready: %v",
dep.Namespace, dep.Name, condition)
} else if condition.Type == appsv1.DeploymentReplicaFailure && condition.Status == corev1.ConditionTrue {
return fmt.Errorf("the Deployment is not ready: %s/%s. deployment has a replica failure and is thus not ready: %v",
dep.Namespace, dep.Name, condition)
}
}
return nil
}
6 changes: 6 additions & 0 deletions controllers/vgmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ func (v vgManager) ensureCreated(r *LVMClusterReconciler, ctx context.Context, l
} else {
unitLogger.Info("daemonset unchanged")
}

if err := verifyDaemonSetReadiness(ds); err != nil {
r.Log.Error(err, "DaemonSet is not considered ready", "DaemonSet", topolvmNodeName)
return err
}

return err
}

Expand Down

0 comments on commit 4aca845

Please sign in to comment.