Skip to content

Commit

Permalink
[ISSUE-1246]: fix race condition when many volumes realeased (#1247)
Browse files Browse the repository at this point in the history
* [ISSUE-1246]: fix race condition when many volumes realeased

Signed-off-by: Dawid Korzepa <[email protected]>

* [ISSUE-1246]: add drive field to the log

Signed-off-by: Dawid Korzepa <[email protected]>

---------

Signed-off-by: Dawid Korzepa <[email protected]>
  • Loading branch information
korzepadawid authored Nov 28, 2024
1 parent d8c2877 commit 47da5e4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
11 changes: 6 additions & 5 deletions pkg/node/volumemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,19 +375,16 @@ func (m *VolumeManager) updateVolumeAndDriveUsageStatus(ctx context.Context, vol
ll.Errorf("Unable to read drive CR, error: %v", err)
return ctrl.Result{Requeue: true}, err
}
if volumeStatus == apiV1.VolumeUsageReleased {
m.addVolumeStatusAnnotation(drive, volume.Name, apiV1.VolumeUsageReleased)
}
if drive != nil {
if err := m.retryDriveUpdate(ctx, volume, drive, driveStatus); err != nil {
if err := m.retryDriveUpdate(ctx, volume, volumeStatus, drive, driveStatus); err != nil {
ll.Errorf("Unable to change drive %s usage status to %s, error: %v.", drive.Name, drive.Spec.Usage, err)
return ctrl.Result{Requeue: true}, err
}
}
return ctrl.Result{}, nil
}

func (m *VolumeManager) retryDriveUpdate(ctx context.Context, volume *volumecrd.Volume, drive *drivecrd.Drive, driveStatus string) error {
func (m *VolumeManager) retryDriveUpdate(ctx context.Context, volume *volumecrd.Volume, volumeStatus string, drive *drivecrd.Drive, driveStatus string) error {
ll := m.log.WithFields(logrus.Fields{
"method": "retryDriveUpdate",
"volumeID": volume.Name,
Expand All @@ -399,6 +396,9 @@ func (m *VolumeManager) retryDriveUpdate(ctx context.Context, volume *volumecrd.
}
for i := 0; i < numberOfRetries; i++ {
drive.Spec.Usage = driveStatus
if volumeStatus == apiV1.VolumeUsageReleased {
m.addVolumeStatusAnnotation(drive, volume.Name, apiV1.VolumeUsageReleased)
}
if err := m.k8sClient.UpdateCR(ctx, drive); err != nil {
ll.Infof("Retrying to update drive %s usage status to %s. Retry number: %d. Sleep %d seconds and retry ...",
drive.Name, drive.Spec.Usage, i, delayBeforeRetry)
Expand Down Expand Up @@ -1263,6 +1263,7 @@ func (m *VolumeManager) isRootMountpoint(devs []lsblk.BlockDevice) bool {

// addVolumeStatusAnnotation add annotation with volume status to drive
func (m *VolumeManager) addVolumeStatusAnnotation(drive *drivecrd.Drive, volumeName, status string) {
m.log.WithField("drive", drive.Name).Infof("Adding volume status annotation %s: %s", volumeName, status)
annotationKey := fmt.Sprintf("%s/%s", apiV1.DriveAnnotationVolumeStatusPrefix, volumeName)
// init map if empty
if drive.Annotations == nil {
Expand Down
18 changes: 13 additions & 5 deletions pkg/node/volumemgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2135,14 +2135,18 @@ func TestVolumeManager_retryDriveUpdate(t *testing.T) {
testVol.Spec.CSIStatus = apiV1.Removing
assert.Nil(t, vm.k8sClient.CreateCR(testCtx, testVol.Name, testVol))
testDrive := testDriveCR.DeepCopy()
assert.Nil(t, vm.k8sClient.CreateCR(testCtx, testVol.Spec.Location, testDrive))
err := vm.retryDriveUpdate(context.TODO(), testVol, testDrive, "FAILED")
assert.Equal(t, 1, len(recorderMock.Calls))
assert.Nil(t, vm.k8sClient.CreateCR(testCtx, testVol.Spec.Location, testDrive))
err := vm.retryDriveUpdate(context.TODO(), testVol, apiV1.VolumeUsageReleased, testDrive, "FAILED")
assert.Equal(t, 1, len(recorderMock.Calls))
assert.Equal(t, eventing.DriveRemovalFailed, recorderMock.Calls[0].Event)
assert.Nil(t, err)
drive := &drivecrd.Drive{}
assert.Nil(t, vm.k8sClient.ReadCR(testCtx, testDrive.Name, testDrive.Namespace, drive))
assert.Equal(t, apiV1.Failed, drive.Spec.Usage)

annotationValue, annotationExist := drive.GetAnnotations()["status/"+testVol.Name]
assert.True(t, annotationExist)
assert.Equal(t, apiV1.VolumeUsageReleased, annotationValue)
})

t.Run("success retry", func(t *testing.T) {
Expand All @@ -2153,11 +2157,15 @@ func TestVolumeManager_retryDriveUpdate(t *testing.T) {
testDrive := testDriveCR.DeepCopy()
testDrive2 := testDriveCR.DeepCopy()
assert.Nil(t, vm.k8sClient.CreateCR(testCtx, testVol.Spec.Location, testDrive))
err := vm.retryDriveUpdate(context.TODO(), testVol, testDrive2, "FAILED")
err := vm.retryDriveUpdate(context.TODO(), testVol, apiV1.VolumeUsageReleased, testDrive2, "FAILED")
assert.Nil(t, err)
drive := &drivecrd.Drive{}
assert.Nil(t, vm.k8sClient.ReadCR(testCtx, testDrive.Name, testDrive.Namespace, drive))
assert.Equal(t, apiV1.Failed, drive.Spec.Usage)

annotationValue, annotationExist := drive.GetAnnotations()["status/"+testVol.Name]
assert.True(t, annotationExist)
assert.Equal(t, apiV1.VolumeUsageReleased, annotationValue)
})

t.Run("failed update", func(t *testing.T) {
Expand All @@ -2166,7 +2174,7 @@ func TestVolumeManager_retryDriveUpdate(t *testing.T) {
testVol.Spec.CSIStatus = apiV1.Removing
testDrive := testDriveCR.DeepCopy()
assert.Nil(t, vm.k8sClient.CreateCR(testCtx, testVol.Name, testVol))
err := vm.retryDriveUpdate(context.TODO(), testVol, testDrive, "RELEASED")
err := vm.retryDriveUpdate(context.TODO(), testVol, apiV1.VolumeUsageReleased, testDrive, "RELEASED")
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "not found")
})
Expand Down

0 comments on commit 47da5e4

Please sign in to comment.