Skip to content

Commit

Permalink
fix(controller): Optimize logic for detach VG from VM for cases with …
Browse files Browse the repository at this point in the history
…large number of VGs (#477)

* fix(controller): Fail waitForV4Task on context timeout and terminal states

The polling will now fail after 1 minute or if task returned has a terminal
state of FAILED or CANCELLED.

* parallelize waiting

* switch polling logic out of goroutines and check them all in a loop

* Stop waiting on detachVG altogether

* Add a log for the requeue
  • Loading branch information
thunderboltsid authored Aug 25, 2024
1 parent 5d63d09 commit 0883e55
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 88 deletions.
65 changes: 9 additions & 56 deletions controllers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ import (
"github.com/nutanix-cloud-native/prism-go-client/utils"
prismclientv3 "github.com/nutanix-cloud-native/prism-go-client/v3"
prismclientv4 "github.com/nutanix-cloud-native/prism-go-client/v4"
prismcommonconfig "github.com/nutanix/ntnx-api-golang-clients/prism-go-client/v4/models/common/v1/config"
prismapi "github.com/nutanix/ntnx-api-golang-clients/prism-go-client/v4/models/prism/v4/config"
prismconfig "github.com/nutanix/ntnx-api-golang-clients/volumes-go-client/v4/models/prism/v4/config"
volumesconfig "github.com/nutanix/ntnx-api-golang-clients/volumes-go-client/v4/models/volumes/v4/config"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/utils/ptr"
capiv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/conditions"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -54,7 +52,7 @@ const (

gpuUnused = "UNUSED"

pollingInterval = time.Second * 2
detachVGRequeueAfter = 30 * time.Second
)

// DeleteVM deletes a VM and is invoked by the NutanixMachineReconciler
Expand Down Expand Up @@ -879,10 +877,10 @@ func getPrismCentralVersion(ctx context.Context, v3Client *prismclientv3.Client)
return *pcInfo.Resources.Version, nil
}

func detachVolumeGroupsFromVM(ctx context.Context, v4Client *prismclientv4.Client, vm *prismclientv3.VMIntentResponse) error {
func detachVolumeGroupsFromVM(ctx context.Context, v4Client *prismclientv4.Client, vmName string, vmUUID string, vmDiskList []*prismclientv3.VMDisk) error {
log := ctrl.LoggerFrom(ctx)
volumeGroupsToDetach := make([]string, 0)
for _, disk := range vm.Spec.Resources.DiskList {
for _, disk := range vmDiskList {
if disk.VolumeGroupReference == nil {
continue
}
Expand All @@ -892,66 +890,21 @@ func detachVolumeGroupsFromVM(ctx context.Context, v4Client *prismclientv4.Clien

// Detach the volume groups from the virtual machine
for _, volumeGroup := range volumeGroupsToDetach {
log.Info(fmt.Sprintf("detaching volume group %s from virtual machine %s", volumeGroup, *vm.Status.Name))
log.Info(fmt.Sprintf("detaching volume group %s from virtual machine %s", volumeGroup, vmName))
body := &volumesconfig.VmAttachment{
ExtId: vm.Metadata.UUID,
ExtId: ptr.To(vmUUID),
}

resp, err := v4Client.VolumeGroupsApiInstance.DetachVm(&volumeGroup, body)
if err != nil {
return fmt.Errorf("failed to detach volume group %s from virtual machine %s: %w", volumeGroup, *vm.Metadata.UUID, err)
return fmt.Errorf("failed to detach volume group %s from virtual machine %s: %w", volumeGroup, vmUUID, err)
}

data := resp.GetData()
task, ok := data.(prismconfig.TaskReference)
if !ok {
if _, ok := data.(prismconfig.TaskReference); !ok {
return fmt.Errorf("failed to cast response to TaskReference")
}

// Wait for the task to complete
if _, err := waitForTaskCompletionV4(ctx, v4Client, *task.ExtId); err != nil {
return fmt.Errorf("failed to wait for task %s to complete: %w", *task.ExtId, err)
}
}

return nil
}

// waitForTaskCompletionV4 waits for a task to complete and returns the completion details
// of the task. The function will poll the task status every 100ms until the task is
// completed or the context is cancelled.
func waitForTaskCompletionV4(ctx context.Context, v4Client *prismclientv4.Client, taskID string) ([]prismcommonconfig.KVPair, error) {
var data []prismcommonconfig.KVPair

if err := wait.PollUntilContextCancel(
ctx,
pollingInterval,
true,
func(ctx context.Context) (done bool, err error) {
task, err := v4Client.TasksApiInstance.GetTaskById(utils.StringPtr(taskID))
if err != nil {
return false, fmt.Errorf("failed to get task %s: %w", taskID, err)
}

taskData, ok := task.GetData().(prismapi.Task)
if !ok {
return false, fmt.Errorf("unexpected task data type %[1]T: %+[1]v", task.GetData())
}

if taskData.Status == nil {
return false, nil
}

if *taskData.Status != prismapi.TASKSTATUS_SUCCEEDED {
return false, nil
}

data = taskData.CompletionDetails

return true, nil
},
); err != nil {
return nil, fmt.Errorf("failed to wait for task %s to complete: %w", taskID, err)
}

return data, nil
}
19 changes: 12 additions & 7 deletions controllers/nutanixmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (r *NutanixMachineReconciler) reconcileDelete(rctx *nctx.MachineContext) (r
}
log.V(1).Info(fmt.Sprintf("no running tasks anymore... Initiating delete for VM %s with UUID %s", vmName, vmUUID))
} else {
log.V(1).Info(fmt.Sprintf("no task UUID found on VM %s. Starting delete.", *vm.Spec.Name))
log.V(1).Info(fmt.Sprintf("no task UUID found on VM %s. Starting delete.", vmName))
}

var vgDetachNeeded bool
Expand All @@ -376,13 +376,18 @@ func (r *NutanixMachineReconciler) reconcileDelete(rctx *nctx.MachineContext) (r
}

if vgDetachNeeded {
if err := r.detachVolumeGroups(rctx, vm); err != nil {
if err := r.detachVolumeGroups(rctx, vmName, vmUUID, vm.Spec.Resources.DiskList); err != nil {
err := fmt.Errorf("failed to detach volume groups from VM %s with UUID %s: %v", vmName, vmUUID, err)
log.Error(err, "failed to detach volume groups from VM")
conditions.MarkFalse(rctx.NutanixMachine, infrav1.VMProvisionedCondition, infrav1.VolumeGroupDetachFailed, capiv1.ConditionSeverityWarning, err.Error())

return reconcile.Result{}, err
}

// Requeue to wait for volume group detach tasks to complete. This is done instead of blocking on task
// completion to avoid long-running reconcile loops.
log.Info(fmt.Sprintf("detaching volume groups from VM %s with UUID %s; requeueing again after %s", vmName, vmUUID, detachVGRequeueAfter))
return reconcile.Result{RequeueAfter: detachVGRequeueAfter}, nil
}

// Delete the VM since the VM was found (err was nil)
Expand All @@ -398,7 +403,7 @@ func (r *NutanixMachineReconciler) reconcileDelete(rctx *nctx.MachineContext) (r
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

func (r *NutanixMachineReconciler) detachVolumeGroups(rctx *nctx.MachineContext, vm *prismclientv3.VMIntentResponse) error {
func (r *NutanixMachineReconciler) detachVolumeGroups(rctx *nctx.MachineContext, vmName string, vmUUID string, vmDiskList []*prismclientv3.VMDisk) error {
createV4Client, err := isPrismCentralV4Compatible(rctx.Context, rctx.NutanixClient)
if err != nil {
return fmt.Errorf("error occurred while checking compatibility for Prism Central v4 APIs: %w", err)
Expand All @@ -413,8 +418,8 @@ func (r *NutanixMachineReconciler) detachVolumeGroups(rctx *nctx.MachineContext,
return fmt.Errorf("error occurred while fetching Prism Central v4 client: %w", err)
}

if err := detachVolumeGroupsFromVM(rctx.Context, v4Client, vm); err != nil {
return fmt.Errorf("failed to detach volume groups from VM %s with UUID %s: %w", rctx.Machine.Name, *vm.Metadata.UUID, err)
if err := detachVolumeGroupsFromVM(rctx.Context, v4Client, vmName, vmUUID, vmDiskList); err != nil {
return fmt.Errorf("failed to detach volume groups from VM %s with UUID %s: %w", vmName, vmUUID, err)
}

return nil
Expand Down Expand Up @@ -701,9 +706,9 @@ func (r *NutanixMachineReconciler) getOrCreateVM(rctx *nctx.MachineContext) (*pr
rctx.SetFailureStatus(capierrors.CreateMachineError, errorMsg)
return nil, errorMsg
}

log.Info(fmt.Sprintf("Waiting for task %s to get completed for VM %s", lastTaskUUID, rctx.NutanixMachine.Name))
err = nutanixclient.WaitForTaskToSucceed(ctx, v3Client, lastTaskUUID)
if err != nil {
if err := nutanixclient.WaitForTaskToSucceed(ctx, v3Client, lastTaskUUID); err != nil {
errorMsg := fmt.Errorf("error occurred while waiting for task %s to start: %v", lastTaskUUID, err)
rctx.SetFailureStatus(capierrors.CreateMachineError, errorMsg)
return nil, errorMsg
Expand Down
46 changes: 21 additions & 25 deletions controllers/nutanixmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ func TestNutanixMachineReconciler(t *testing.T) {
}
err := reconciler.detachVolumeGroups(&nctx.MachineContext{
NutanixClient: v3Client,
}, &prismclientv3.VMIntentResponse{
Metadata: &prismclientv3.Metadata{
UUID: &ntnxMachine.Status.VmUUID,
},
})
},
"",
ntnxMachine.Status.VmUUID,
[]*prismclientv3.VMDisk{},
)
g.Expect(err).To(HaveOccurred())
})

Expand All @@ -271,11 +271,10 @@ func TestNutanixMachineReconciler(t *testing.T) {
}
err := reconciler.detachVolumeGroups(&nctx.MachineContext{
NutanixClient: v3Client,
}, &prismclientv3.VMIntentResponse{
Metadata: &prismclientv3.Metadata{
UUID: &ntnxMachine.Status.VmUUID,
},
})
}, "",
ntnxMachine.Status.VmUUID,
[]*prismclientv3.VMDisk{},
)
g.Expect(err).To(HaveOccurred())
})

Expand All @@ -291,11 +290,10 @@ func TestNutanixMachineReconciler(t *testing.T) {
}
err := reconciler.detachVolumeGroups(&nctx.MachineContext{
NutanixClient: v3Client,
}, &prismclientv3.VMIntentResponse{
Metadata: &prismclientv3.Metadata{
UUID: &ntnxMachine.Status.VmUUID,
},
})
}, "",
ntnxMachine.Status.VmUUID,
[]*prismclientv3.VMDisk{},
)
g.Expect(err).To(HaveOccurred())
})

Expand All @@ -311,11 +309,10 @@ func TestNutanixMachineReconciler(t *testing.T) {
}
err := reconciler.detachVolumeGroups(&nctx.MachineContext{
NutanixClient: v3Client,
}, &prismclientv3.VMIntentResponse{
Metadata: &prismclientv3.Metadata{
UUID: &ntnxMachine.Status.VmUUID,
},
})
}, "",
ntnxMachine.Status.VmUUID,
[]*prismclientv3.VMDisk{},
)
g.Expect(err).To(HaveOccurred())
})

Expand All @@ -331,11 +328,10 @@ func TestNutanixMachineReconciler(t *testing.T) {
}
err := reconciler.detachVolumeGroups(&nctx.MachineContext{
NutanixClient: v3Client,
}, &prismclientv3.VMIntentResponse{
Metadata: &prismclientv3.Metadata{
UUID: &ntnxMachine.Status.VmUUID,
},
})
}, "",
ntnxMachine.Status.VmUUID,
[]*prismclientv3.VMDisk{},
)
g.Expect(err).To(Not(HaveOccurred()))
})
})
Expand Down

0 comments on commit 0883e55

Please sign in to comment.