diff --git a/cmd/vgmanager/main.go b/cmd/vgmanager/main.go index 3dd435970..61be8ede4 100644 --- a/cmd/vgmanager/main.go +++ b/cmd/vgmanager/main.go @@ -1,5 +1,5 @@ /* -Copyright 2021 Red Hat Openshift Data Foundation. +Copyright © 2023 Red Hat, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,28 +17,25 @@ limitations under the License. package main import ( - - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) - // to ensure that exec-entrypoint and run can make use of them. - "flag" "os" - _ "k8s.io/client-go/plugin/pkg/client/auth" - lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1" "github.com/openshift/lvm-operator/pkg/vgmanager" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) + // to ensure that exec-entrypoint and run can make use of them. + _ "k8s.io/client-go/plugin/pkg/client/auth" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + scheme = runtime.NewScheme() ) func init() { @@ -46,21 +43,22 @@ func init() { utilruntime.Must(lvmv1alpha1.AddToScheme(scheme)) } -var metricsAddr string -var probeAddr string -var developmentMode bool - func main() { + var metricsAddr string + var probeAddr string + var developmentMode bool flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") - flag.BoolVar(&developmentMode, "development", false, "enable to enable development") + flag.BoolVar(&developmentMode, "development", false, "The switch to enable development mode.") + flag.Parse() + opts := zap.Options{} opts.BindFlags(flag.CommandLine) - flag.Parse() opts.Development = developmentMode - ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + setupLog := ctrl.Log.WithName("setup") + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, @@ -80,7 +78,7 @@ func main() { NodeName: os.Getenv("NODE_NAME"), Namespace: os.Getenv("POD_NAMESPACE"), }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "LVMCluster") + setupLog.Error(err, "unable to create controller", "controller", "VGManager") os.Exit(1) } diff --git a/pkg/vgmanager/devices.go b/pkg/vgmanager/devices.go new file mode 100644 index 000000000..1e4bb87c4 --- /dev/null +++ b/pkg/vgmanager/devices.go @@ -0,0 +1,205 @@ +/* +Copyright © 2023 Red Hat, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vgmanager + +import ( + "fmt" + "path/filepath" + + lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1" + "github.com/openshift/lvm-operator/pkg/internal" +) + +func (r *VGReconciler) addDevicesToVG(vgName string, devices []internal.BlockDevice) error { + if len(devices) < 1 { + return fmt.Errorf("can't create vg %q with 0 devices", vgName) + } + + // check if volume group is already present + vgs, err := ListVolumeGroups(r.executor) + if err != nil { + return fmt.Errorf("failed to list volume groups. %v", err) + } + + vgFound := false + for _, vg := range vgs { + if vg.Name == vgName { + vgFound = true + } + } + + args := []string{vgName} + for _, device := range devices { + if device.DevicePath != "" { + args = append(args, device.DevicePath) + } else { + args = append(args, device.KName) + } + } + + // TODO: Check if we can use functions from lvm.go here + var cmd string + if vgFound { + r.Log.Info("extending an existing volume group", "VGName", vgName) + cmd = "/usr/sbin/vgextend" + } else { + r.Log.Info("creating a new volume group", "VGName", vgName) + cmd = "/usr/sbin/vgcreate" + } + + _, err = r.executor.ExecuteCommandWithOutputAsHost(cmd, args...) + if err != nil { + return fmt.Errorf("failed to create or extend volume group %q. %v", vgName, err) + } + + return nil +} + +func (r *VGReconciler) getMatchingDevicesForVG(volumeGroup *lvmv1alpha1.LVMVolumeGroup) ([]internal.BlockDevice, []internal.BlockDevice, error) { + // The LVMVolumeGroup was created/modified + r.Log.Info("getting block devices for volumegroup", "VGName", volumeGroup.Name) + + // list block devices + blockDevices, err := internal.ListBlockDevices(r.executor) + if err != nil { + return nil, nil, fmt.Errorf("failed to list block devices: %v", err) + } + + // filter out block devices + remainingValidDevices, delayedDevices := r.filterAvailableDevices(blockDevices) + + var matchingDevices []internal.BlockDevice + matchingDevices, err = r.filterMatchingDevices(remainingValidDevices, volumeGroup) + if err != nil { + r.Log.Error(err, "could not filter matching devices", "VGName", volumeGroup.Name) + return nil, nil, err + } + + return matchingDevices, delayedDevices, nil +} + +// filterAvailableDevices returns: +// validDevices: the list of blockdevices considered available +// delayedDevices: the list of blockdevices considered available, but first observed less than 'minDeviceAge' time ago +func (r *VGReconciler) filterAvailableDevices(blockDevices []internal.BlockDevice) ([]internal.BlockDevice, []internal.BlockDevice) { + var availableDevices, delayedDevices []internal.BlockDevice + // using a label so `continue DeviceLoop` can be used to skip devices +DeviceLoop: + for _, blockDevice := range blockDevices { + + // store device in deviceAgeMap + r.deviceAgeMap.storeDeviceAge(blockDevice.KName) + + // check for partitions recursively + if blockDevice.HasChildren() { + childAvailableDevices, childDelayedDevices := r.filterAvailableDevices(blockDevice.Children) + availableDevices = append(availableDevices, childAvailableDevices...) + delayedDevices = append(delayedDevices, childDelayedDevices...) + } + + devLogger := r.Log.WithValues("Device.Name", blockDevice.Name) + for name, filter := range FilterMap { + filterLogger := devLogger.WithValues("filter.Name", name) + valid, err := filter(blockDevice, r.executor) + if err != nil { + filterLogger.Error(err, "filter error") + valid = false + continue DeviceLoop + } else if !valid { + filterLogger.Info("does not match filter") + continue DeviceLoop + } + } + // check if the device is older than deviceMinAge + isOldEnough := r.deviceAgeMap.isOlderThan(blockDevice.KName) + if isOldEnough { + availableDevices = append(availableDevices, blockDevice) + } else { + delayedDevices = append(delayedDevices, blockDevice) + } + } + return availableDevices, delayedDevices +} + +// filterMatchingDevices returns matched blockdevices +func (r *VGReconciler) filterMatchingDevices(blockDevices []internal.BlockDevice, volumeGroup *lvmv1alpha1.LVMVolumeGroup) ([]internal.BlockDevice, error) { + var filteredBlockDevices []internal.BlockDevice + if volumeGroup.Spec.DeviceSelector != nil && len(volumeGroup.Spec.DeviceSelector.Paths) > 0 { + for _, path := range volumeGroup.Spec.DeviceSelector.Paths { + diskName, err := filepath.EvalSymlinks(path) + if err != nil { + err = fmt.Errorf("unable to find symlink for disk path %s: %v", path, err) + return []internal.BlockDevice{}, err + } + + blockDevice, ok := hasExactDisk(blockDevices, diskName) + if !ok { + err := fmt.Errorf("can not find device name %s in the available block devices", path) + return []internal.BlockDevice{}, err + } + + blockDevice.DevicePath = path + filteredBlockDevices = append(filteredBlockDevices, blockDevice) + } + + return filteredBlockDevices, nil + } + + // return all available block devices if none is specified in the CR + return blockDevices, nil +} + +// filterOutAttachedDevices filters out already attached devices to the volume group +func (r *VGReconciler) filterOutAttachedDevices(blockDevices []internal.BlockDevice, volumeGroup *lvmv1alpha1.LVMVolumeGroup) ([]internal.BlockDevice, error) { + var filteredBlockDevices []internal.BlockDevice + + vgs, err := ListVolumeGroups(r.executor) + if err != nil { + return []internal.BlockDevice{}, fmt.Errorf("failed to list volume groups. %v", err) + } + + for _, device := range blockDevices { + if !isDeviceAlreadyPartOfVG(vgs, device.Name, volumeGroup) { + filteredBlockDevices = append(filteredBlockDevices, device) + } + } + + return filteredBlockDevices, nil +} + +func isDeviceAlreadyPartOfVG(vgs []VolumeGroup, diskName string, volumeGroup *lvmv1alpha1.LVMVolumeGroup) bool { + for _, vg := range vgs { + if vg.Name == volumeGroup.Name { + for _, pv := range vg.PVs { + if pv == diskName { + return true + } + } + } + } + + return false +} + +func hasExactDisk(blockDevices []internal.BlockDevice, deviceName string) (internal.BlockDevice, bool) { + for _, blockDevice := range blockDevices { + if blockDevice.KName == deviceName { + return blockDevice, true + } + } + return internal.BlockDevice{}, false +} diff --git a/pkg/vgmanager/status.go b/pkg/vgmanager/status.go new file mode 100644 index 000000000..fc19182bb --- /dev/null +++ b/pkg/vgmanager/status.go @@ -0,0 +1,151 @@ +/* +Copyright © 2023 Red Hat, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vgmanager + +import ( + "context" + "fmt" + + lvmv1alpha1 "github.com/openshift/lvm-operator/api/v1alpha1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +func (r *VGReconciler) setVolumeGroupReadyStatus(ctx context.Context, vgName string) error { + status := &lvmv1alpha1.VGStatus{ + Name: vgName, + Status: lvmv1alpha1.VGStatusReady, + } + + // Set devices for the VGStatus. + if _, err := r.setDevices(status); err != nil { + return err + } + + return r.setVolumeGroupStatus(ctx, status) +} + +func (r *VGReconciler) setVolumeGroupFailedStatus(ctx context.Context, vgName string, reason string) error { + status := &lvmv1alpha1.VGStatus{ + Name: vgName, + Status: lvmv1alpha1.VGStatusFailed, + Reason: reason, + } + + // Set devices for the VGStatus. + // If there is backing volume group, then set as degraded + devicesExist, err := r.setDevices(status) + if err != nil { + return err + } + if devicesExist { + status.Status = lvmv1alpha1.VGStatusDegraded + } + + return r.setVolumeGroupStatus(ctx, status) +} + +func (r *VGReconciler) setVolumeGroupStatus(ctx context.Context, status *lvmv1alpha1.VGStatus) error { + // Get LVMVolumeGroupNodeStatus and set the relevant VGStatus + nodeStatus := &lvmv1alpha1.LVMVolumeGroupNodeStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.NodeName, + Namespace: r.Namespace, + }, + } + + result, err := ctrl.CreateOrUpdate(ctx, r.Client, nodeStatus, func() error { + exists := false + for i, existingVGStatus := range nodeStatus.Spec.LVMVGStatus { + if existingVGStatus.Name == status.Name { + exists = true + nodeStatus.Spec.LVMVGStatus[i] = *status + } + } + if !exists { + nodeStatus.Spec.LVMVGStatus = append(nodeStatus.Spec.LVMVGStatus, *status) + } + + return nil + }) + if err != nil { + r.Log.Error(err, "failed to create or update LVMVolumeGroupNodeStatus", "name", nodeStatus.Name) + return err + } else if result != controllerutil.OperationResultNone { + r.Log.Info("LVMVolumeGroupNodeStatus modified", "operation", result, "name", nodeStatus.Name) + } else { + r.Log.Info("LVMVolumeGroupNodeStatus unchanged") + } + + return nil +} + +func (r *VGReconciler) removeVolumeGroupStatus(ctx context.Context, vgName string) error { + // Get LVMVolumeGroupNodeStatus and remove the relevant VGStatus + nodeStatus := &lvmv1alpha1.LVMVolumeGroupNodeStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.NodeName, + Namespace: r.Namespace, + }, + } + + exist := false + index := 0 + result, err := ctrl.CreateOrUpdate(ctx, r.Client, nodeStatus, func() error { + for i, existingVGStatus := range nodeStatus.Spec.LVMVGStatus { + if existingVGStatus.Name == vgName { + exist = true + index = i + } + } + + if exist { + nodeStatus.Spec.LVMVGStatus = append(nodeStatus.Spec.LVMVGStatus[:index], nodeStatus.Spec.LVMVGStatus[index+1:]...) + } + + return nil + }) + if err != nil { + r.Log.Error(err, "failed to create or update LVMVolumeGroupNodeStatus", "name", nodeStatus.Name) + return err + } else if result != controllerutil.OperationResultNone { + r.Log.Info("LVMVolumeGroupNodeStatus modified", "operation", result, "name", nodeStatus.Name) + } else { + r.Log.Info("LVMVolumeGroupNodeStatus unchanged") + } + + return nil +} + +func (r *VGReconciler) setDevices(status *lvmv1alpha1.VGStatus) (bool, error) { + vgs, err := ListVolumeGroups(r.executor) + if err != nil { + return false, fmt.Errorf("failed to list volume groups. %v", err) + } + + devicesExist := false + for _, vg := range vgs { + if status.Name == vg.Name { + devicesExist = true + status.Devices = vg.PVs + } + } + + return devicesExist, nil +} diff --git a/pkg/vgmanager/vgmanager_controller.go b/pkg/vgmanager/vgmanager_controller.go index 06e7e73f1..191315fee 100644 --- a/pkg/vgmanager/vgmanager_controller.go +++ b/pkg/vgmanager/vgmanager_controller.go @@ -1,5 +1,5 @@ /* -Copyright 2021 Red Hat Openshift Data Foundation. +Copyright © 2023 Red Hat, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import ( "context" "fmt" "os" - "path/filepath" "strconv" "strings" "time" @@ -32,22 +31,26 @@ import ( "github.com/openshift/lvm-operator/pkg/internal" "github.com/topolvm/topolvm/lvmd" lvmdCMD "github.com/topolvm/topolvm/pkg/lvmd/cmd" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" corev1helper "k8s.io/component-helpers/scheduling/corev1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/yaml" ) const ( - ControllerName = "vg-manager" - DefaultChunkSize = "128" + ControllerName = "vg-manager" + DefaultChunkSize = "128" + reconcileInterval = 1 * time.Minute +) + +var ( + reconcileAgain = ctrl.Result{Requeue: true, RequeueAfter: reconcileInterval} ) // SetupWithManager sets up the controller with the Manager. @@ -71,9 +74,9 @@ type VGReconciler struct { func (r *VGReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { r.Log = log.FromContext(ctx).WithName(ControllerName) - r.Log.Info("reconciling", "lvmvolumegroup", req) + r.Log.Info("reconciling", "LVMVolumeGroup", req) - // Check if this lvmvolumegroup needs to be processed on this node + // Check if this LVMVolumeGroup needs to be processed on this node volumeGroup := &lvmv1alpha1.LVMVolumeGroup{} err := r.Client.Get(ctx, req.NamespacedName, volumeGroup) if err != nil { @@ -89,30 +92,23 @@ func (r *VGReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re r.Log.Error(err, "failed to match nodeSelector to node labels", "VGName", volumeGroup.Name) return ctrl.Result{}, err } - if !nodeMatches { // Nothing to be done on this node for the VG. r.Log.Info("node labels do not match the selector", "VGName", volumeGroup.Name) return ctrl.Result{}, nil } + r.executor = &internal.CommandExecutor{} - res, err := r.reconcile(ctx, req, volumeGroup) + res, err := r.reconcile(ctx, volumeGroup) if err != nil { - r.Log.Error(err, "reconcile error", "lvmvolumegroup", req.Name) + r.Log.Error(err, "reconcile error", "LVMVolumeGroup", volumeGroup.Name) } r.Log.Info("reconcile complete", "result", res) return res, err - } -var reconcileInterval = time.Minute * 1 -var reconcileAgain = ctrl.Result{Requeue: true, RequeueAfter: reconcileInterval} - -//TODO: Refactor this function to move the ctrl result to a single place - -func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request, volumeGroup *lvmv1alpha1.LVMVolumeGroup) (ctrl.Result, error) { - - // The LVMVolumeGroup resource was deleted +func (r *VGReconciler) reconcile(ctx context.Context, volumeGroup *lvmv1alpha1.LVMVolumeGroup) (ctrl.Result, error) { + // Check if the LVMVolumeGroup resource is deleted if !volumeGroup.DeletionTimestamp.IsZero() { err := r.processDelete(ctx, volumeGroup) return ctrl.Result{}, err @@ -121,11 +117,12 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request, volumeGr // Read the lvmd config file lvmdConfig, err := loadLVMDConfig() if err != nil { - // Failed to read lvmdconfig file. Reconcile again r.Log.Error(err, "failed to read the lvmd config file") + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, fmt.Sprintf("failed to read the lvmd config file: %v", err.Error())); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) + } return reconcileAgain, err } - if lvmdConfig == nil { // The lvmdconfig file does not exist and will be created. r.Log.Info("lvmd config file doesn't exist, will create") @@ -135,26 +132,15 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request, volumeGr } existingLvmdConfig := *lvmdConfig - // To avoid having to iterate through device classes multiple times, map from name to config index - deviceClassMap := make(map[string]int) - for i, deviceClass := range lvmdConfig.DeviceClasses { - deviceClassMap[deviceClass.Name] = i - } - - status := &lvmv1alpha1.VGStatus{ - Name: req.Name, - } - _, found := deviceClassMap[volumeGroup.Name] - //Get the block devices that can be used for this volumegroup matchingDevices, delayedDevices, err := r.getMatchingDevicesForVG(volumeGroup) if err != nil { - r.Log.Error(err, "failed to get block devices for volumegroup", "name", volumeGroup.Name) - - status.Reason = err.Error() - if statuserr := r.updateStatus(ctx, status); statuserr != nil { - r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) - return reconcileAgain, nil + r.Log.Error(err, "failed to get block devices for volumegroup, will retry", "name", volumeGroup.Name) + // Set a failure status only if there is an error and there is no delayed devices. If there are delayed devices, there is a chance that this will pass in the next reconciliation. + if len(delayedDevices) == 0 { + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, fmt.Sprintf("failed to get block devices for volumegroup %s: %v", volumeGroup.Name, err.Error())); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) + } } // Failed to get devices for this vg. Reconcile again. @@ -162,34 +148,61 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request, volumeGr } if len(matchingDevices) == 0 { - r.Log.Info("no matching devices found for volume group", "VGName", volumeGroup.Name) + r.Log.Error(fmt.Errorf("no matching devices found for volume group"), "VGName", volumeGroup.Name) if len(delayedDevices) > 0 { return ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}, nil //30 seconds to make sure delayed devices become available } - if found { - // Update the status again just to be safe. - if statuserr := r.updateStatus(ctx, nil); statuserr != nil { - r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) - return reconcileAgain, nil - } + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, "no matching devices found for volume group"); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) + return reconcileAgain, statuserr } + return ctrl.Result{}, nil + } else { + matchingDevices, err = r.filterOutAttachedDevices(matchingDevices, volumeGroup) + if err != nil { + return reconcileAgain, err + } + + if len(matchingDevices) == 0 { + r.Log.Info("all the matching devices are attached to the volume group", "VGName", volumeGroup.Name) + if statuserr := r.setVolumeGroupReadyStatus(ctx, volumeGroup.Name); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) + return reconcileAgain, statuserr + } + return ctrl.Result{}, nil + } } - // create/extend VG and update lvmd config + // Create/extend VG err = r.addDevicesToVG(volumeGroup.Name, matchingDevices) if err != nil { - status.Reason = err.Error() - r.Log.Error(err, "failed to create/extend volume group", "VGName", volumeGroup.Name) - - if statuserr := r.updateStatus(ctx, status); statuserr != nil { + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, fmt.Sprintf("failed to create/extend volume group %s: %v", volumeGroup.Name, err.Error())); statuserr != nil { r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) } return reconcileAgain, err } + // Create thin pool + err = r.addThinPoolToVG(volumeGroup.Name, volumeGroup.Spec.ThinPoolConfig) + if err != nil { + r.Log.Error(err, "failed to create thin pool", "VGName", "ThinPool", volumeGroup.Name, volumeGroup.Spec.ThinPoolConfig.Name) + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, + fmt.Sprintf("failed to create thin pool %s for volume group %s: %v", volumeGroup.Spec.ThinPoolConfig.Name, volumeGroup.Name, err.Error())); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) + return reconcileAgain, statuserr + } + } + + // Add the volume group to device classes if not exists + found := false + for _, deviceClass := range lvmdConfig.DeviceClasses { + if deviceClass.Name == volumeGroup.Name { + found = true + } + } if !found { dc := &lvmd.DeviceClass{ Name: volumeGroup.Name, @@ -207,30 +220,22 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request, volumeGr lvmdConfig.DeviceClasses = append(lvmdConfig.DeviceClasses, dc) } - // Create thin pool - err = r.addThinPoolToVG(volumeGroup.Name, volumeGroup.Spec.ThinPoolConfig) - if err != nil { - r.Log.Error(err, "failed to create thin pool", "VGName", "ThinPool", volumeGroup.Name, volumeGroup.Spec.ThinPoolConfig.Name) - } - - // apply and save lvmconfig - // pass config to configChannel only if config has changed + // Apply and save lvmd config if !cmp.Equal(existingLvmdConfig, lvmdConfig) { err := saveLVMDConfig(lvmdConfig) if err != nil { - r.Log.Error(err, "failed to update lvmd.conf file", "VGName", volumeGroup.Name) + r.Log.Error(err, "failed to update lvmd config file", "VGName", volumeGroup.Name) + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, fmt.Sprintf("failed to update lvmd config file: %v", err.Error())); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) + } return reconcileAgain, err } r.Log.Info("updated lvmd config", "VGName", volumeGroup.Name) } - if err == nil { - if statuserr := r.updateStatus(ctx, nil); statuserr != nil { - r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) - return reconcileAgain, nil - } - } else { - r.Log.Error(err, "failed to get volume group from the host", "name", volumeGroup.Name) + if statuserr := r.setVolumeGroupReadyStatus(ctx, volumeGroup.Name); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) + return reconcileAgain, nil } // requeue faster if some devices are too recently observed to consume @@ -241,78 +246,6 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request, volumeGr return ctrl.Result{RequeueAfter: requeueAfter}, nil } -func (r *VGReconciler) addThinPoolToVG(vgName string, config *lvmv1alpha1.ThinPoolConfig) error { - resp, err := GetLVSOutput(r.executor, vgName) - if err != nil { - return fmt.Errorf("failed to list logical volumes in the volume group %q. %v", vgName, err) - } - - for _, report := range resp.Report { - for _, lv := range report.Lv { - if lv.Name == config.Name { - if strings.Contains(lv.LvAttr, "t") { - r.Log.Info("lvm thinpool already exists", "VGName", vgName, "ThinPool", config.Name) - err = r.extendThinPool(vgName, lv.LvSize, config) - if err != nil { - r.Log.Error(err, "failed to extend the lvm thinpool", "VGName", vgName, "ThinPool", config.Name) - } - return err - } - - return fmt.Errorf("failed to create thin pool %q. Logical volume with same name already exists", config.Name) - } - } - } - - args := []string{"-l", fmt.Sprintf("%d%%FREE", config.SizePercent), "-c", DefaultChunkSize, "-Z", "y", "-T", fmt.Sprintf("%s/%s", vgName, config.Name)} - - _, err = r.executor.ExecuteCommandWithOutputAsHost(lvCreateCmd, args...) - if err != nil { - return fmt.Errorf("failed to create thin pool %q in the volume group %q. %v", config.Name, vgName, err) - } - - return nil -} - -func (r *VGReconciler) extendThinPool(vgName string, lvSize string, config *lvmv1alpha1.ThinPoolConfig) error { - - vg, err := GetVolumeGroup(r.executor, vgName) - if err != nil { - if err != ErrVolumeGroupNotFound { - return fmt.Errorf("failed to get volume group. %q, %v", vgName, err) - } - return nil - } - - thinPoolSize, err := strconv.ParseFloat(lvSize[:len(lvSize)-1], 64) - if err != nil { - return fmt.Errorf("failed to parse lvSize. %v", err) - } - - vgSize, err := strconv.ParseFloat(vg.VgSize[:len(vg.VgSize)-1], 64) - if err != nil { - return fmt.Errorf("failed to parse vgSize. %v", err) - } - - // return if thinPoolSize does not require expansion - if config.SizePercent <= int((thinPoolSize/vgSize)*100) { - return nil - } - - r.Log.Info("extending lvm thinpool ", "VGName", vgName, "ThinPool", config.Name) - - args := []string{"-l", fmt.Sprintf("%d%%Vg", config.SizePercent), fmt.Sprintf("%s/%s", vgName, config.Name)} - - _, err = r.executor.ExecuteCommandWithOutputAsHost(lvExtendCmd, args...) - if err != nil { - return fmt.Errorf("failed to extend thin pool %q in the volume group %q. %v", config.Name, vgName, err) - } - - r.Log.Info("successfully extended the thin pool in the volume group ", "thinpool", config.Name, "vgName", vgName) - - return nil -} - func (r *VGReconciler) processDelete(ctx context.Context, volumeGroup *lvmv1alpha1.LVMVolumeGroup) error { // Read the lvmd config file @@ -324,9 +257,9 @@ func (r *VGReconciler) processDelete(ctx context.Context, volumeGroup *lvmv1alph } if lvmdConfig == nil { r.Log.Info("lvmd config file does not exist") - // Remove the VG entry in the lvmvolumegroupnodestatus that was added to indicate the failures to the user. - // This allows the lvmcluster to get deleted and not stuck/wait forever as lvmcluster looks for the lvmvolumegroupnodestatus before deleting. - if statuserr := r.updateStatus(ctx, nil); statuserr != nil { + // Remove the VG entry in the LVMVolumeGroupNodeStatus that was added to indicate the failures to the user. + // This allows the LVMCluster to get deleted and not stuck/wait forever as LVMCluster looks for the LVMVolumeGroupNodeStatus before deleting. + if statuserr := r.removeVolumeGroupStatus(ctx, volumeGroup.Name); statuserr != nil { r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) return statuserr } @@ -341,6 +274,10 @@ func (r *VGReconciler) processDelete(ctx context.Context, volumeGroup *lvmv1alph if !found { // Nothing to do here. r.Log.Info("failed to find volume group in lvmd deviceclasses list", "VGName", volumeGroup.Name) + if statuserr := r.removeVolumeGroupStatus(ctx, volumeGroup.Name); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) + return statuserr + } return nil } @@ -364,6 +301,9 @@ func (r *VGReconciler) processDelete(ctx context.Context, volumeGroup *lvmv1alph if lvExists { err := DeleteLV(r.executor, thinPoolName, volumeGroup.Name) if err != nil { + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, fmt.Sprintf("failed to delete thin pool %s in volume group %s: %v", thinPoolName, volumeGroup.Name, err.Error())); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) + } return fmt.Errorf("failed to delete thin pool %q in volume group %q. %v", thinPoolName, volumeGroup.Name, err) } r.Log.Info("thin pool deleted in the volume group.", "VGName", volumeGroup.Name, "ThinPool", thinPoolName) @@ -374,12 +314,14 @@ func (r *VGReconciler) processDelete(ctx context.Context, volumeGroup *lvmv1alph err = vg.Delete(r.executor) if err != nil { + if statuserr := r.setVolumeGroupFailedStatus(ctx, volumeGroup.Name, fmt.Sprintf("failed to delete volume group %s: %v", volumeGroup.Name, err.Error())); statuserr != nil { + r.Log.Error(statuserr, "failed to update status", "name", volumeGroup.Name) + } return fmt.Errorf("failed to delete volume group. %q, %v", volumeGroup.Name, err) } // Remove this vg from the lvmdconf file lvmdConfig.DeviceClasses = append(lvmdConfig.DeviceClasses[:index], lvmdConfig.DeviceClasses[index+1:]...) - //r.Log.Info("After delete: ", "deviceclasses", lvmdConfig.DeviceClasses) r.Log.Info("updating lvmd config") if len(lvmdConfig.DeviceClasses) > 0 { @@ -396,119 +338,84 @@ func (r *VGReconciler) processDelete(ctx context.Context, volumeGroup *lvmv1alph } } - if statuserr := r.updateStatus(ctx, nil); statuserr != nil { + if statuserr := r.removeVolumeGroupStatus(ctx, volumeGroup.Name); statuserr != nil { r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name) return statuserr } - return err -} -func (r *VGReconciler) addDevicesToVG(vgName string, devices []internal.BlockDevice) error { - if len(devices) < 1 { - return fmt.Errorf("can't create vg %q with 0 devices", vgName) - } + return nil +} - // check if volume group is already present - vgs, err := ListVolumeGroups(r.executor) +func (r *VGReconciler) addThinPoolToVG(vgName string, config *lvmv1alpha1.ThinPoolConfig) error { + resp, err := GetLVSOutput(r.executor, vgName) if err != nil { - return fmt.Errorf("failed to list volume groups. %v", err) + return fmt.Errorf("failed to list logical volumes in the volume group %q. %v", vgName, err) } - vgFound := false - for _, vg := range vgs { - if vg.Name == vgName { - vgFound = true - } - } + for _, report := range resp.Report { + for _, lv := range report.Lv { + if lv.Name == config.Name { + if strings.Contains(lv.LvAttr, "t") { + r.Log.Info("lvm thinpool already exists", "VGName", vgName, "ThinPool", config.Name) + err = r.extendThinPool(vgName, lv.LvSize, config) + if err != nil { + r.Log.Error(err, "failed to extend the lvm thinpool", "VGName", vgName, "ThinPool", config.Name) + } + return err + } - args := []string{vgName} - for _, device := range devices { - if device.DevicePath != "" { - args = append(args, device.DevicePath) - } else { - args = append(args, device.KName) + return fmt.Errorf("failed to create thin pool %q. Logical volume with same name already exists", config.Name) + } } } - var cmd string - if vgFound { - r.Log.Info("extending an existing volume group", "VGName", vgName) - cmd = "/usr/sbin/vgextend" - } else { - r.Log.Info("creating a new volume group", "VGName", vgName) - cmd = "/usr/sbin/vgcreate" - } + args := []string{"-l", fmt.Sprintf("%d%%FREE", config.SizePercent), "-c", DefaultChunkSize, "-Z", "y", "-T", fmt.Sprintf("%s/%s", vgName, config.Name)} - _, err = r.executor.ExecuteCommandWithOutputAsHost(cmd, args...) + _, err = r.executor.ExecuteCommandWithOutputAsHost(lvCreateCmd, args...) if err != nil { - return fmt.Errorf("failed to create or extend volume group %q. %v", vgName, err) + return fmt.Errorf("failed to create thin pool %q in the volume group %q. %v", config.Name, vgName, err) } return nil } -// filterMatchingDevices returns matched blockdevices -func (r *VGReconciler) filterMatchingDevices(blockDevices []internal.BlockDevice, volumeGroup *lvmv1alpha1.LVMVolumeGroup) ([]internal.BlockDevice, error) { - - var filteredBlockDevices []internal.BlockDevice - - if volumeGroup.Spec.DeviceSelector != nil && len(volumeGroup.Spec.DeviceSelector.Paths) > 0 { - vgs, err := ListVolumeGroups(r.executor) - if err != nil { - return []internal.BlockDevice{}, fmt.Errorf("failed to list volume groups. %v", err) - } - - for _, path := range volumeGroup.Spec.DeviceSelector.Paths { - diskName, err := filepath.EvalSymlinks(path) - if err != nil { - err = fmt.Errorf("unable to find symlink for disk path %s: %v", path, err) - return []internal.BlockDevice{}, err - } - - isAlreadyExist := isDeviceAlreadyPartOfVG(vgs, diskName, volumeGroup) - if isAlreadyExist { - continue - } - - blockDevice, ok := hasExactDisk(blockDevices, diskName) - if !ok { - err := fmt.Errorf("can not find device name %s in the available block devices", path) - return []internal.BlockDevice{}, err - } +func (r *VGReconciler) extendThinPool(vgName string, lvSize string, config *lvmv1alpha1.ThinPoolConfig) error { - blockDevice.DevicePath = path - filteredBlockDevices = append(filteredBlockDevices, blockDevice) + vg, err := GetVolumeGroup(r.executor, vgName) + if err != nil { + if err != ErrVolumeGroupNotFound { + return fmt.Errorf("failed to get volume group. %q, %v", vgName, err) } + return nil + } - return filteredBlockDevices, nil + thinPoolSize, err := strconv.ParseFloat(lvSize[:len(lvSize)-1], 64) + if err != nil { + return fmt.Errorf("failed to parse lvSize. %v", err) } - // return all available block devices if none is specified in the CR - return blockDevices, nil -} + vgSize, err := strconv.ParseFloat(vg.VgSize[:len(vg.VgSize)-1], 64) + if err != nil { + return fmt.Errorf("failed to parse vgSize. %v", err) + } -func hasExactDisk(blockDevices []internal.BlockDevice, deviceName string) (internal.BlockDevice, bool) { - for _, blockDevice := range blockDevices { - if blockDevice.KName == deviceName { - return blockDevice, true - } + // return if thinPoolSize does not require expansion + if config.SizePercent <= int((thinPoolSize/vgSize)*100) { + return nil } - return internal.BlockDevice{}, false -} -func isDeviceAlreadyPartOfVG(vgs []VolumeGroup, diskName string, volumeGroup *lvmv1alpha1.LVMVolumeGroup) bool { + r.Log.Info("extending lvm thinpool ", "VGName", vgName, "ThinPool", config.Name) - for _, vg := range vgs { - if vg.Name == volumeGroup.Name { - for _, pv := range vg.PVs { - if pv == diskName { - return true - } - } - } + args := []string{"-l", fmt.Sprintf("%d%%Vg", config.SizePercent), fmt.Sprintf("%s/%s", vgName, config.Name)} + + _, err = r.executor.ExecuteCommandWithOutputAsHost(lvExtendCmd, args...) + if err != nil { + return fmt.Errorf("failed to extend thin pool %q in the volume group %q. %v", config.Name, vgName, err) } - return false + r.Log.Info("successfully extended the thin pool in the volume group ", "thinpool", config.Name, "vgName", vgName) + + return nil } func NodeSelectorMatchesNodeLabels(node *corev1.Node, nodeSelector *corev1.NodeSelector) (bool, error) { @@ -562,156 +469,3 @@ func deleteLVMDConfig() error { err := os.Remove(controllers.LvmdConfigFile) return err } - -func (r *VGReconciler) getMatchingDevicesForVG(volumeGroup *lvmv1alpha1.LVMVolumeGroup) ([]internal.BlockDevice, []internal.BlockDevice, error) { - // The LVMVolumeGroup was created/modified - r.Log.Info("getting block devices for volumegroup", "VGName", volumeGroup.Name) - - // list block devices - blockDevices, err := internal.ListBlockDevices(r.executor) - if err != nil { - return nil, nil, fmt.Errorf("failed to list block devices: %v", err) - } - - // filter out block devices - remainingValidDevices, delayedDevices, err := r.filterAvailableDevices(blockDevices) - if err != nil { - _ = err - } - - var matchingDevices []internal.BlockDevice - matchingDevices, err = r.filterMatchingDevices(remainingValidDevices, volumeGroup) - if err != nil { - r.Log.Error(err, "could not filter matching devices", "VGName", volumeGroup.Name) - return nil, nil, err - } - - return matchingDevices, delayedDevices, nil -} - -func (r *VGReconciler) generateVolumeGroupNodeStatus(vgStatus *lvmv1alpha1.VGStatus) (*lvmv1alpha1.LVMVolumeGroupNodeStatus, error) { - - vgs, err := ListVolumeGroups(r.executor) - if err != nil { - return nil, fmt.Errorf("failed to list volume groups. %v", err) - } - - //lvmvgstatus := vgNodeStatus.Spec.LVMVGStatus - var statusarr []lvmv1alpha1.VGStatus - - var vgExists bool - - for _, vg := range vgs { - newStatus := &lvmv1alpha1.VGStatus{ - Name: vg.Name, - Devices: vg.PVs, - Status: lvmv1alpha1.VGStatusReady, - } - - if vgStatus != nil && vgStatus.Name == vg.Name { - vgExists = true - newStatus.Status = lvmv1alpha1.VGStatusDegraded - newStatus.Reason = vgStatus.Reason - } - - statusarr = append(statusarr, *newStatus) - } - - if vgStatus != nil && !vgExists { - vgStatus.Status = lvmv1alpha1.VGStatusFailed - statusarr = append(statusarr, *vgStatus) - } - - vgNodeStatus := &lvmv1alpha1.LVMVolumeGroupNodeStatus{ - ObjectMeta: metav1.ObjectMeta{ - Name: r.NodeName, - Namespace: r.Namespace, - }, - Spec: lvmv1alpha1.LVMVolumeGroupNodeStatusSpec{ - LVMVGStatus: statusarr, - }, - } - - return vgNodeStatus, nil -} - -func (r *VGReconciler) updateStatus(ctx context.Context, vgStatus *lvmv1alpha1.VGStatus) error { - - vgNodeStatus, err := r.generateVolumeGroupNodeStatus(vgStatus) - - nodeStatus := &lvmv1alpha1.LVMVolumeGroupNodeStatus{ - ObjectMeta: metav1.ObjectMeta{ - Name: r.NodeName, - Namespace: r.Namespace, - }, - } - - if err != nil { - r.Log.Error(err, "failed to generate nodeStatus") - return err - } - - result, err := ctrl.CreateOrUpdate(ctx, r.Client, nodeStatus, func() error { - nodeStatus.Spec.LVMVGStatus = vgNodeStatus.Spec.LVMVGStatus - return nil - }) - - if err != nil { - r.Log.Error(err, "failed to create or update lvmvolumegroupnodestatus", "name", vgNodeStatus.Name) - return err - } else if result != controllerutil.OperationResultNone { - r.Log.Info("lvmvolumegroupnodestatus modified", "operation", result, "name", vgNodeStatus.Name) - } else { - r.Log.Info("lvmvolumegroupnodestatus unchanged") - } - return err -} - -// filterAvailableDevices returns: -// validDevices: the list of blockdevices considered available -// delayedDevices: the list of blockdevices considered available, but first observed less than 'minDeviceAge' time ago -// error -func (r *VGReconciler) filterAvailableDevices(blockDevices []internal.BlockDevice) ([]internal.BlockDevice, []internal.BlockDevice, error) { - var availableDevices, delayedDevices []internal.BlockDevice - // using a label so `continue DeviceLoop` can be used to skip devices -DeviceLoop: - for _, blockDevice := range blockDevices { - - // store device in deviceAgeMap - r.deviceAgeMap.storeDeviceAge(blockDevice.KName) - - // check for partitions recursively - if blockDevice.HasChildren() { - childAvailableDevices, childDelayedDevices, err := r.filterAvailableDevices(blockDevice.Children) - if err != nil { - return []internal.BlockDevice{}, []internal.BlockDevice{}, err - } - availableDevices = append(availableDevices, childAvailableDevices...) - delayedDevices = append(delayedDevices, childDelayedDevices...) - } - - devLogger := r.Log.WithValues("Device.Name", blockDevice.Name) - for name, filter := range FilterMap { - var valid bool - var err error - filterLogger := devLogger.WithValues("filter.Name", name) - valid, err = filter(blockDevice, r.executor) - if err != nil { - filterLogger.Error(err, "filter error") - valid = false - continue DeviceLoop - } else if !valid { - filterLogger.Info("does not match filter") - continue DeviceLoop - } - } - // check if the device is older than deviceMinAge - isOldEnough := r.deviceAgeMap.isOlderThan(blockDevice.KName) - if isOldEnough { - availableDevices = append(availableDevices, blockDevice) - } else { - delayedDevices = append(delayedDevices, blockDevice) - } - } - return availableDevices, delayedDevices, nil -}