Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-1051] Handle Kubelet's Wrong CSI Call Inconsistent with Real Volume Status #1050

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
62 changes: 56 additions & 6 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,15 @@
}

currStatus := volumeCR.Spec.CSIStatus
if currStatus == apiV1.Created {
switch currStatus {
case apiV1.Failed:
ll.Warningf("Volume status: %s. Need to retry.", currStatus)
case apiV1.Created:
ll.Info("Volume has been already unstaged")
return &csi.NodeUnstageVolumeResponse{}, nil
} else if currStatus != apiV1.VolumeReady {
case apiV1.VolumeReady:
ll.Infof("Expected volume status: %s", currStatus)
default:
msg := fmt.Sprintf("current volume CR status - %s, expected to be in [%s, %s]",
currStatus, apiV1.Created, apiV1.VolumeReady)
ll.Error(msg)
Expand Down Expand Up @@ -468,8 +473,9 @@
}

currStatus := volumeCR.Spec.CSIStatus
// if currStatus not in [VolumeReady, Published]
if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published {
if currStatus == apiV1.Failed {
ll.Warningf("Volume status: %s. Need to retry.", currStatus)
} else if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published {
msg := fmt.Sprintf("current volume CR status - %s, expected to be in [%s, %s]",
currStatus, apiV1.VolumeReady, apiV1.Published)
ll.Error(msg)
Expand All @@ -488,6 +494,46 @@
resp, errToReturn = nil, fmt.Errorf("failed to publish volume: fake attach error %s", err.Error())
}
} else {
// will check whether srcPath is mounted, if not, need to redo NodeStageVolume
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little concerned about this process, as some mount logic(like global path setup) is done in kubelet. there redo nodeStage may not help in such case. do we have a test can prove that this will help in the failure case?

Copy link
Collaborator Author

@CraneShiEMC CraneShiEMC Aug 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kubelet issued wrong CSI call NodePublishVolume in this case assumed that the volume's device global path's mountpoint has been successfully setup in some successful previous CSI call NodeStageVolume. But acutally the volume's device global path has been unmounted in the possible forceful node-removal and kubelet cannot successfully sync the volume's real status because of its continually-failed "orphan" pod's volume cleanup by calling CSI NodeUnpublish when CSI pods have not been intialized yet.

In spite of this, the device global path still exists there if the volume still exists in the most cases. Even though the device global path has also been removed in the worst case, CSI current logic can still create the device global path itself.

I will do the functional test on this code change to verify it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've similuated the scenario of volume's k8s global device mountpoint missing in my standalone test. This CSI defensive enhancement can work well as expected in the test.

srcMounted, err := s.fsOps.IsMounted(srcPath)
if err != nil {
errMsg := fmt.Sprintf("execute IsMounted on %s with error: %s", srcPath, err.Error())
ll.Error(errMsg)
return nil, fmt.Errorf("failed to publish volume: %s", errMsg)
}
if !srcMounted {
ll.Warnf("staging path %s is not mounted! need to redo NodeStageVolume!", srcPath)
nodeStageReq := &csi.NodeStageVolumeRequest{
VolumeId: volumeID,
StagingTargetPath: req.GetStagingTargetPath(),
VolumeCapability: req.GetVolumeCapability(),
}

// unlock volume to redo NodeStageVolume
err := s.volMu.UnlockKey(req.GetVolumeId())
if err != nil {
errMsg := fmt.Sprintf("unlock volume %s to redo NodeStageVolume with error: %s", volumeID, err.Error())
ll.Error(errMsg)
return nil, fmt.Errorf("failed to publish volume: %s", errMsg)
}

Check warning on line 518 in pkg/node/node.go

View check run for this annotation

Codecov / codecov/patch

pkg/node/node.go#L515-L518

Added lines #L515 - L518 were not covered by tests
nodeStageResp, err := s.NodeStageVolume(ctx, nodeStageReq)

// re-lock the volume to proceed NodePublishVolume
s.volMu.LockKey(req.GetVolumeId())

if nodeStageResp == nil && err != nil {
errMsg := fmt.Sprintf("redo NodeStageVolume on volume %s with error: %s", volumeID, err.Error())
ll.Error(errMsg)
return nil, fmt.Errorf("failed to publish volume: %s", errMsg)
}

// update the content of volume
volumeCR, err = s.crHelper.GetVolumeByID(volumeID)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("unable to get updated volume %s", volumeID))
}

Check warning on line 534 in pkg/node/node.go

View check run for this annotation

Codecov / codecov/patch

pkg/node/node.go#L533-L534

Added lines #L533 - L534 were not covered by tests
}

_, isBlock := req.GetVolumeCapability().GetAccessType().(*csi.VolumeCapability_Block)
if err := s.fsOps.PrepareAndPerformMount(srcPath, dstPath, isBlock, !isBlock, mountOptions...); err != nil {
ll.Errorf("Unable to mount volume: %v", err)
Expand Down Expand Up @@ -554,8 +600,9 @@
}

currStatus := volumeCR.Spec.CSIStatus
// if currStatus not in [VolumeReady, Published]
if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published {
if currStatus == apiV1.Failed {
ll.Warningf("Volume status: %s. Need to retry.", currStatus)
} else if currStatus != apiV1.VolumeReady && currStatus != apiV1.Published {
msg := fmt.Sprintf("current volume CR status - %s, expected to be in [%s, %s]",
currStatus, apiV1.VolumeReady, apiV1.Published)
ll.Error(msg)
Expand Down Expand Up @@ -602,6 +649,9 @@
volumeCR.Spec.Owners = owners
if len(volumeCR.Spec.Owners) == 0 {
volumeCR.Spec.CSIStatus = apiV1.VolumeReady
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems this is not needed, the default state is Published in this function.

Copy link
Collaborator Author

@CraneShiEMC CraneShiEMC Aug 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we will proceed on the failed volume now, here the set of status to Published is for Failed volume successfully unpublished and used by multiple pods.

// ensure the Published status of volume in the successful processing
volumeCR.Spec.CSIStatus = apiV1.Published
}
if updateErr := s.k8sClient.UpdateCR(ctxWithID, volumeCR); updateErr != nil {
ll.Errorf("Unable to set volume CR status to VolumeReady: %v", updateErr)
Expand Down
101 changes: 90 additions & 11 deletions pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ var _ = Describe("CSINodeService NodePublish()", func() {
req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap)
req.VolumeContext[util.PodNameKey] = testPod1Name

fsOps.On("PrepareAndPerformMount",
path.Join(req.GetStagingTargetPath(), stagingFileName), req.GetTargetPath(), false, true).
Return(nil)
srcPath := path.Join(req.GetStagingTargetPath(), stagingFileName)
fsOps.On("PrepareAndPerformMount", srcPath, req.GetTargetPath(), false, true).Return(nil)
fsOps.On("IsMounted", srcPath).Return(true, nil)

resp, err := node.NodePublishVolume(testCtx, req)
Expect(resp).NotTo(BeNil())
Expand All @@ -101,7 +101,9 @@ var _ = Describe("CSINodeService NodePublish()", func() {
Expect(err).To(BeNil())
Expect(volumeCR.Spec.Owners[0]).To(Equal(testPod1Name))

// publish again such volume
// publish again such volume in Failed Status
volumeCR.Spec.CSIStatus = apiV1.Failed
err = node.k8sClient.UpdateCR(testCtx, volumeCR)
resp, err = node.NodePublishVolume(testCtx, req)
Expect(resp).NotTo(BeNil())
Expect(err).To(BeNil())
Expand All @@ -111,6 +113,29 @@ var _ = Describe("CSINodeService NodePublish()", func() {
err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR)
Expect(err).To(BeNil())
Expect(len(volumeCR.Spec.Owners)).To(Equal(1))
Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Published))
})
It("Should publish volume successfully with stagingPath unmounted", func() {
req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap)
req.VolumeContext[util.PodNameKey] = testPod1Name

partitionPath := "/partition/path/for/volume1"
stagingPath := path.Join(req.GetStagingTargetPath(), stagingFileName)
prov.On("GetVolumePath", &testVolume1).Return(partitionPath, nil)
fsOps.On("PrepareAndPerformMount", partitionPath, stagingPath, true, false).Return(nil)

fsOps.On("PrepareAndPerformMount", stagingPath, req.GetTargetPath(), false, true).Return(nil)
fsOps.On("IsMounted", stagingPath).Return(false, nil)

resp, err := node.NodePublishVolume(testCtx, req)
Expect(resp).NotTo(BeNil())
Expect(err).To(BeNil())

// check owner appearance
volumeCR := &vcrd.Volume{}
err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR)
Expect(err).To(BeNil())
Expect(volumeCR.Spec.Owners[0]).To(Equal(testPod1Name))
})
})

Expand Down Expand Up @@ -157,12 +182,12 @@ var _ = Describe("CSINodeService NodePublish()", func() {
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("Staging Path missing in request"))
})
It("Should fail, because Volume has failed status", func() {
It("Should fail, because Volume has unexpected status", func() {
req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap)
vol1 := &vcrd.Volume{}
err := node.k8sClient.ReadCR(testCtx, testVolume1.Id, testNs, vol1)
Expect(err).To(BeNil())
vol1.Spec.CSIStatus = apiV1.Failed
vol1.Spec.CSIStatus = apiV1.Creating
err = node.k8sClient.UpdateCR(testCtx, vol1)
Expect(err).To(BeNil())

Expand All @@ -184,15 +209,44 @@ var _ = Describe("CSINodeService NodePublish()", func() {
It("Should fail, because of PrepareAndPerformMount failed", func() {
req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap)

fsOps.On("PrepareAndPerformMount",
path.Join(req.GetStagingTargetPath(), stagingFileName), req.GetTargetPath(), false, true).
Return(errors.New("error mount"))
srcPath := path.Join(req.GetStagingTargetPath(), stagingFileName)
fsOps.On("PrepareAndPerformMount", srcPath, req.GetTargetPath(), false, true).Return(errors.New("error mount"))
fsOps.On("IsMounted", srcPath).Return(true, nil)

resp, err := node.NodePublishVolume(testCtx, req)
Expect(resp).To(BeNil())
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("mount error"))
})
It("Should fail, because the check on whether stagingPath is mounted failed", func() {
req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap)

srcPath := path.Join(req.GetStagingTargetPath(), stagingFileName)
errMsg := fmt.Sprintf("unable to check whether %s is mounted", srcPath)
fsOps.On("IsMounted", srcPath).Return(false, errors.New(errMsg))

resp, err := node.NodePublishVolume(testCtx, req)
Expect(resp).To(BeNil())
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring(errMsg))
})
It("Should fail, because redoing NodeStageVolume when stagingPath unmounted failed", func() {
req := getNodePublishRequest(testV1ID, targetPath, *testVolumeCap)
req.VolumeContext[util.PodNameKey] = testPod1Name

partitionPath := "/partition/path/for/volume1"
stagingPath := path.Join(req.GetStagingTargetPath(), stagingFileName)
prov.On("GetVolumePath", &testVolume1).Return(partitionPath, nil)
fsOps.On("PrepareAndPerformMount", partitionPath, stagingPath, true, false).Return(errors.New("mount error"))
fsOps.On("IsMounted", stagingPath).Return(false, nil)

redoErr := fmt.Sprintf("redo NodeStageVolume on volume %s with error", testV1ID)

resp, err := node.NodePublishVolume(testCtx, req)
Expect(resp).To(BeNil())
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring(redoErr))
})
})
})

Expand Down Expand Up @@ -364,6 +418,19 @@ var _ = Describe("CSINodeService NodeUnPublish()", func() {
Expect(err).To(BeNil())
Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.VolumeReady))
Expect(volumeCR.Spec.Owners).To(BeNil())

// unpublish again on failed volume
volumeCR.Spec.CSIStatus = apiV1.Failed
err = node.k8sClient.UpdateCR(testCtx, volumeCR)
Expect(err).To(BeNil())

resp, err = node.NodeUnpublishVolume(testCtx, req)
Expect(resp).NotTo(BeNil())
Expect(err).To(BeNil())

err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR)
Expect(err).To(BeNil())
Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.VolumeReady))
})
It("Should unpublish volume and don't change volume CR status", func() {
req := getNodeUnpublishRequest(testV1ID, targetPath1)
Expand Down Expand Up @@ -506,6 +573,18 @@ var _ = Describe("CSINodeService NodeUnStage()", func() {
err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR)
Expect(err).To(BeNil())
Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Created))

// retry unstage on failed volume
volumeCR.Spec.CSIStatus = apiV1.Failed
err = node.k8sClient.UpdateCR(testCtx, volumeCR)
Expect(err).To(BeNil())
resp, err = node.NodeUnstageVolume(testCtx, req)
Expect(resp).NotTo(BeNil())
Expect(err).To(BeNil())

err = node.k8sClient.ReadCR(testCtx, testV1ID, "", volumeCR)
Expect(err).To(BeNil())
Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Created))
})
})

Expand Down Expand Up @@ -564,12 +643,12 @@ var _ = Describe("CSINodeService NodeUnStage()", func() {
Expect(volumeCR.Spec.CSIStatus).To(Equal(apiV1.Failed))
})

It("Should failed, because Volume has failed status", func() {
It("Should failed, because Volume has unexpected status", func() {
req := getNodeUnstageRequest(testV1ID, targetPath)
vol1 := &vcrd.Volume{}
err := node.k8sClient.ReadCR(testCtx, testVolume1.Id, testNs, vol1)
Expect(err).To(BeNil())
vol1.Spec.CSIStatus = apiV1.Failed
vol1.Spec.CSIStatus = apiV1.Creating
err = node.k8sClient.UpdateCR(testCtx, vol1)
Expect(err).To(BeNil())

Expand Down
Loading