Skip to content

Commit

Permalink
Merge pull request #135 from nbalacha/vg-refactor
Browse files Browse the repository at this point in the history
fix: refactors vgmanager-controller code
  • Loading branch information
openshift-merge-robot authored Mar 28, 2022
2 parents fc611e4 + f546c9b commit dea7ead
Showing 1 changed file with 107 additions and 84 deletions.
191 changes: 107 additions & 84 deletions pkg/vgmanager/vgmanager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/topolvm/topolvm/lvmd"
lvmdCMD "github.com/topolvm/topolvm/pkg/lvmd/cmd"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -67,30 +68,17 @@ type VGReconciler struct {
func (r *VGReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Log = log.FromContext(ctx).WithName(ControllerName)
r.Log.Info("reconciling", "lvmvolumegroup", req)
r.executor = &internal.CommandExecutor{}
res, err := r.reconcile(ctx, req)
if err != nil {
r.Log.Error(err, "reconcile error")
}
r.Log.Info("reconcile complete", "result", res)
return res, err

}

var reconcileInterval = time.Minute * 1
var reconcileAgain ctrl.Result = ctrl.Result{Requeue: true, RequeueAfter: reconcileInterval}

//TODO: Refactor this function to move the ctrl result to a single place

func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

//Check if this lvmvolumegroup needs to be processed on this node
volumeGroup := &lvmv1alpha1.LVMVolumeGroup{}
err := r.Client.Get(ctx, req.NamespacedName, volumeGroup)
if err != nil {
r.Log.Error(err, "failed to get LVMVolumeGroup", "VGName", req.Name)
if !errors.IsNotFound(err) {
return reconcileAgain, err
}
return ctrl.Result{}, err
}

//Check if the VG nodeSelector matches the labels on this node
nodeMatches, err := r.matchesThisNode(ctx, volumeGroup.Spec.NodeSelector)
if err != nil {
Expand All @@ -100,68 +88,67 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re

if !nodeMatches {
//Nothing to be done on this node for the VG.
r.Log.Info("node does not match selector", "VGName", volumeGroup.Name)
r.Log.Info("node labels do not match the selector", "VGName", volumeGroup.Name)
return ctrl.Result{}, nil
}

r.Log.Info("listing block devices", "VGName", volumeGroup.Name)

// list block devices
blockDevices, err := internal.ListBlockDevices(r.executor)
r.executor = &internal.CommandExecutor{}
res, err := r.reconcile(ctx, req, volumeGroup)
if err != nil {
return reconcileAgain, fmt.Errorf("failed to list block devices: %v", err)
r.Log.Error(err, "reconcile error", "lvmvolumegroup", req.Name)
}
r.Log.Info("reconcile complete", "result", res)
return res, err

// filter out block devices
remainingValidDevices, delayedDevices, err := r.filterAvailableDevices(blockDevices)
if err != nil {
_ = err
}
}

var matchingDevices []internal.BlockDevice
_, matchingDevices, err = filterMatchingDevices(remainingValidDevices, volumeGroup)
var reconcileInterval = time.Minute * 1
var reconcileAgain ctrl.Result = ctrl.Result{Requeue: true, RequeueAfter: reconcileInterval}

//TODO: Refactor this function to move the ctrl result to a single place

func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request, volumeGroup *lvmv1alpha1.LVMVolumeGroup) (ctrl.Result, error) {

//Get the block devices that can be used for this volumegroup
matchingDevices, delayedDevices, err := r.getMatchingDevicesForVG(volumeGroup)
if err != nil {
r.Log.Error(err, "could not filter matching devices", "VGName", volumeGroup.Name)
// Failed to get devices for this vg. Reconcile again.
r.Log.Error(err, "failed to get block devices for volumegroup", "VGName", volumeGroup.Name)
return reconcileAgain, err
}

status := &lvmv1alpha1.VGStatus{
Name: req.Name,
Status: lvmv1alpha1.VGStatusReady,
Reason: "",
}

existingLvmdConfig := &lvmdCMD.Config{}

// load lvmd config
lvmdConfig := &lvmdCMD.Config{
SocketName: controllers.DefaultLVMdSocket,
//Read the lvmd config file
lvmdConfig, err := loadLVMDConfig()
if err != nil {
//Failed to read lvmdconfig file. Reconcile again
r.Log.Error(err, "failed to read the lvmd config file")
return reconcileAgain, err
}

cfgBytes, err := os.ReadFile(controllers.LvmdConfigFile)
if os.IsNotExist(err) {
if lvmdConfig == nil {
// The lvmdconfig file does not exist and will be created.
r.Log.Info("lvmd config file doesn't exist, will create")
} else if err != nil {
return reconcileAgain, err
} else {
err = yaml.Unmarshal(cfgBytes, &lvmdConfig)
if err != nil {
return reconcileAgain, err
lvmdConfig = &lvmdCMD.Config{
SocketName: controllers.DefaultLVMdSocket,
}
existingLvmdConfig = lvmdConfig
}
existingLvmdConfig := *lvmdConfig

// avoid having to iterate through device classes multiple times, map from name to config index
//To avoid having to iterate through device classes multiple times, map from name to config index
deviceClassMap := make(map[string]int)
for i, deviceClass := range lvmdConfig.DeviceClasses {
deviceClassMap[deviceClass.Name] = i
}

status := &lvmv1alpha1.VGStatus{
Name: req.Name,
Status: lvmv1alpha1.VGStatusReady,
Reason: "",
}
_, found := deviceClassMap[volumeGroup.Name]
if found {
volGrpHostInfo, err := GetVolumeGroup(r.executor, volumeGroup.Name)
if err != nil {
r.Log.Error(err, "failed to get volume group", "name", volumeGroup.Name)
r.Log.Error(err, "failed to get volume group from the host", "name", volumeGroup.Name)
} else {
status.Devices = volGrpHostInfo.PVs
}
Expand All @@ -171,17 +158,16 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re
r.Log.Info("no matching devices", "VGName", volumeGroup.Name)
if len(delayedDevices) > 0 {
return reconcileAgain, nil
} else {
if found {
// Update the status again just to be safe.
if statuserr := r.updateStatus(ctx, status, volumeGroup); statuserr != nil {
r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name)
return reconcileAgain, nil
}
}

if found {
// Update the status again just to be safe.
if statuserr := r.updateStatus(ctx, status, volumeGroup); statuserr != nil {
r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name)
return reconcileAgain, nil
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}

// create/extend VG and update lvmd config
Expand Down Expand Up @@ -212,28 +198,24 @@ func (r *VGReconciler) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Re

// apply and save lvmconfig
// pass config to configChannel only if config has changed
if !cmp.Equal(existingLvmdConfig, lvmdConfig) {
r.Log.Info("updating lvmd config")
out, err := yaml.Marshal(lvmdConfig)
if err == nil {
err = os.WriteFile(controllers.LvmdConfigFile, out, 0600)
}

if !cmp.Equal(existingLvmdConfig, lvmdConfig) {
err := saveLVMDConfig(lvmdConfig)
if err != nil {
r.Log.Error(err, "failed to update lvmd.conf file", "VGName", volumeGroup.Name)
return reconcileAgain, err
}
r.Log.Info("updated lvmd config", "VGName", volumeGroup.Name)
}

volGrpHostInfo, err := GetVolumeGroup(r.executor, volumeGroup.Name)
if err == nil {
status.Devices = volGrpHostInfo.PVs
if statuserr := r.updateStatus(ctx, status, volumeGroup); statuserr != nil {
r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name)
}
} else {
r.Log.Error(err, "failed to get volume group", "name", volumeGroup.Name)
}

if statuserr := r.updateStatus(ctx, status, volumeGroup); statuserr != nil {
r.Log.Error(statuserr, "failed to update status", "VGName", volumeGroup.Name)
r.Log.Error(err, "failed to get volume group from the host", "name", volumeGroup.Name)
}

// requeue faster if some devices are too recently observed to consume
Expand Down Expand Up @@ -303,17 +285,6 @@ func NodeSelectorMatchesNodeLabels(node *corev1.Node, nodeSelector *corev1.NodeS
return matches, err
}

func ToleratesTaints(tolerations []corev1.Toleration, taints []corev1.Taint) bool {
for _, t := range taints {
taint := t
toleratesTaint := corev1helper.TolerationsTolerateTaint(tolerations, &taint)
if !toleratesTaint {
return false
}
}
return true
}

func setStatus(status *lvmv1alpha1.VGStatus, instance *lvmv1alpha1.LVMVolumeGroupNodeStatus) {
found := false

Expand Down Expand Up @@ -390,3 +361,55 @@ func (r *VGReconciler) matchesThisNode(ctx context.Context, selector *corev1.Nod
}
return NodeSelectorMatchesNodeLabels(node, selector)
}

func loadLVMDConfig() (*lvmdCMD.Config, error) {

cfgBytes, err := os.ReadFile(controllers.LvmdConfigFile)
if os.IsNotExist(err) {
//If the file does not exist, return nil for both
return nil, nil
} else if err != nil {
return nil, err
} else {
lvmdconfig := &lvmdCMD.Config{}
err = yaml.Unmarshal(cfgBytes, lvmdconfig)
if err != nil {
return nil, err
}
return lvmdconfig, nil
}
}

func saveLVMDConfig(lvmdConfig *lvmdCMD.Config) error {
out, err := yaml.Marshal(lvmdConfig)
if err == nil {
err = os.WriteFile(controllers.LvmdConfigFile, out, 0600)
}
return err
}

func (r *VGReconciler) getMatchingDevicesForVG(volumeGroup *lvmv1alpha1.LVMVolumeGroup) (matching []internal.BlockDevice, delayed []internal.BlockDevice, err error) {
// The LVMVolumeGroup was created/modified
r.Log.Info("getting block devices for volumegroup", "VGName", volumeGroup.Name)

// list block devices
blockDevices, err := internal.ListBlockDevices(r.executor)
if err != nil {
return nil, nil, fmt.Errorf("failed to list block devices: %v", err)
}

// filter out block devices
remainingValidDevices, delayedDevices, err := r.filterAvailableDevices(blockDevices)
if err != nil {
_ = err
}

var matchingDevices []internal.BlockDevice
_, matchingDevices, err = filterMatchingDevices(remainingValidDevices, volumeGroup)
if err != nil {
r.Log.Error(err, "could not filter matching devices", "VGName", volumeGroup.Name)
return nil, nil, err
}

return matchingDevices, delayedDevices, nil
}

0 comments on commit dea7ead

Please sign in to comment.