From 30638635fb022b8fee9aaa1b95c920274308b4cc Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Fri, 10 Nov 2023 12:17:18 +0100 Subject: [PATCH] Re-queue SnapshotContents that are readyToUse: false SnapshotContents with readyToUse: false should be periodically requeued with exp. backoff until the CSI driver confirms the snapshot is ready. --- pkg/sidecar-controller/content_create_test.go | 92 +++++++++++++++++++ pkg/sidecar-controller/framework_test.go | 29 ++++-- pkg/sidecar-controller/snapshot_controller.go | 50 +++++++--- .../snapshot_controller_base.go | 48 ++++++---- 4 files changed, 174 insertions(+), 45 deletions(-) diff --git a/pkg/sidecar-controller/content_create_test.go b/pkg/sidecar-controller/content_create_test.go index 8c1de931c..a58ac80df 100644 --- a/pkg/sidecar-controller/content_create_test.go +++ b/pkg/sidecar-controller/content_create_test.go @@ -198,6 +198,98 @@ func TestSyncContent(t *testing.T) { errors: noerrors, test: testSyncContent, }, + { + name: "1-7: Just created un-ready snapshot should be requeued", + // A new snapshot should be created + initialContents: withContentStatus(newContentArray("content1-7", "snapuid1-7", "snap1-7", "sid1-7", defaultClass, "", "volume-handle-1-7", retainPolicy, nil, &defaultSize, true), + nil), + expectedContents: withContentAnnotations(withContentStatus(newContentArray("content1-7", "snapuid1-7", "snap1-7", "sid1-7", defaultClass, "", "volume-handle-1-7", retainPolicy, nil, &defaultSize, true), + &crdv1.VolumeSnapshotContentStatus{SnapshotHandle: toStringPointer("snapuid1-7"), RestoreSize: &defaultSize, ReadyToUse: &False}), + map[string]string{}), + expectedEvents: noevents, + expectedCreateCalls: []createCall{ + { + volumeHandle: "volume-handle-1-7", + snapshotName: "snapshot-snapuid1-7", + driverName: mockDriverName, + snapshotId: "snapuid1-7", + parameters: map[string]string{ + utils.PrefixedVolumeSnapshotNameKey: "snap1-7", + utils.PrefixedVolumeSnapshotNamespaceKey: "default", + utils.PrefixedVolumeSnapshotContentNameKey: "content1-7", + }, + creationTime: timeNow, + readyToUse: false, + size: defaultSize, + }, + }, + errors: noerrors, + expectRequeue: true, + expectSuccess: true, + test: testSyncContent, + }, + { + name: "1-8: Un-ready snapshot that remains un-ready should be requeued", + // An un-ready snapshot already exists, it will be refreshed + initialContents: withContentAnnotations(withContentStatus(newContentArray("content1-8", "snapuid1-8", "snap1-8", "sid1-8", defaultClass, "", "volume-handle-1-8", retainPolicy, nil, &defaultSize, true), + &crdv1.VolumeSnapshotContentStatus{SnapshotHandle: toStringPointer("snapuid1-8"), RestoreSize: &defaultSize, ReadyToUse: &False}), + map[string]string{}), + expectedContents: withContentAnnotations(withContentStatus(newContentArray("content1-8", "snapuid1-8", "snap1-8", "sid1-8", defaultClass, "", "volume-handle-1-8", retainPolicy, nil, &defaultSize, true), + &crdv1.VolumeSnapshotContentStatus{SnapshotHandle: toStringPointer("snapuid1-8"), RestoreSize: &defaultSize, ReadyToUse: &False}), + map[string]string{}), + expectedEvents: noevents, + expectedCreateCalls: []createCall{ + { + volumeHandle: "volume-handle-1-8", + snapshotName: "snapshot-snapuid1-8", + driverName: mockDriverName, + snapshotId: "snapuid1-8", + parameters: map[string]string{ + utils.PrefixedVolumeSnapshotNameKey: "snap1-8", + utils.PrefixedVolumeSnapshotNamespaceKey: "default", + utils.PrefixedVolumeSnapshotContentNameKey: "content1-8", + }, + creationTime: timeNow, + readyToUse: false, + size: defaultSize, + }, + }, + errors: noerrors, + expectRequeue: true, + expectSuccess: true, + test: testSyncContent, + }, + { + name: "1-9: Un-ready snapshot that becomes ready should not be requeued", + // An un-ready snapshot already exists, it will be refreshed + initialContents: withContentAnnotations(withContentStatus(newContentArray("content1-9", "snapuid1-9", "snap1-9", "sid1-9", defaultClass, "", "volume-handle-1-9", retainPolicy, nil, &defaultSize, true), + &crdv1.VolumeSnapshotContentStatus{SnapshotHandle: toStringPointer("snapuid1-9"), RestoreSize: &defaultSize, ReadyToUse: &False}), + map[string]string{}), + expectedContents: withContentAnnotations(withContentStatus(newContentArray("content1-9", "snapuid1-9", "snap1-9", "sid1-9", defaultClass, "", "volume-handle-1-9", retainPolicy, nil, &defaultSize, true), + &crdv1.VolumeSnapshotContentStatus{SnapshotHandle: toStringPointer("snapuid1-9"), RestoreSize: &defaultSize, ReadyToUse: &True}), + map[string]string{}), + expectedEvents: noevents, + expectedCreateCalls: []createCall{ + { + volumeHandle: "volume-handle-1-9", + snapshotName: "snapshot-snapuid1-9", + driverName: mockDriverName, + snapshotId: "snapuid1-9", + parameters: map[string]string{ + utils.PrefixedVolumeSnapshotNameKey: "snap1-9", + utils.PrefixedVolumeSnapshotNamespaceKey: "default", + utils.PrefixedVolumeSnapshotContentNameKey: "content1-9", + }, + creationTime: timeNow, + readyToUse: true, + size: defaultSize, + }, + }, + errors: noerrors, + expectRequeue: false, + expectSuccess: true, + test: testSyncContent, + }, } runSyncContentTests(t, tests, snapshotClasses) diff --git a/pkg/sidecar-controller/framework_test.go b/pkg/sidecar-controller/framework_test.go index aef710c6a..849a524bf 100644 --- a/pkg/sidecar-controller/framework_test.go +++ b/pkg/sidecar-controller/framework_test.go @@ -97,9 +97,10 @@ type controllerTest struct { // Function to call as the test. test testCall expectSuccess bool + expectRequeue bool } -type testCall func(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) error +type testCall func(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) (requeue bool, err error) const ( testNamespace = "default" @@ -690,16 +691,16 @@ func withContentAnnotations(content []*crdv1.VolumeSnapshotContent, annotations return content } -func testSyncContent(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) error { +func testSyncContent(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) (bool, error) { return ctrl.syncContent(test.initialContents[0]) } -func testSyncContentError(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) error { - err := ctrl.syncContent(test.initialContents[0]) +func testSyncContentError(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) (bool, error) { + requeue, err := ctrl.syncContent(test.initialContents[0]) if err != nil { - return nil + return requeue, nil } - return fmt.Errorf("syncSnapshotContent succeeded when failure was expected") + return requeue, fmt.Errorf("syncSnapshotContent succeeded when failure was expected") } var ( @@ -725,7 +726,7 @@ var ( // controller waits for the operation lock. Controller is then resumed and we // check how it behaves. func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor)) testCall { - return func(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) error { + return func(ctrl *csiSnapshotSideCarController, reactor *snapshotReactor, test controllerTest) (bool, error) { // Inject a hook before async operation starts klog.V(4).Infof("reactor:injecting call") injectBeforeOperation(ctrl, reactor) @@ -733,10 +734,11 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c // Run the tested function (typically syncContent) in a // separate goroutine. var testError error + var requeue bool var testFinished int32 go func() { - testError = toWrap(ctrl, reactor, test) + requeue, testError = toWrap(ctrl, reactor, test) // Let the "main" test function know that syncContent has finished. atomic.StoreInt32(&testFinished, 1) }() @@ -746,7 +748,7 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c time.Sleep(time.Millisecond * 10) } - return testError + return requeue, testError } } @@ -804,13 +806,20 @@ func runSyncContentTests(t *testing.T, tests []controllerTest, snapshotClasses [ ctrl.classLister = storagelisters.NewVolumeSnapshotClassLister(indexer) // Run the tested functions - err = test.test(ctrl, reactor, test) + requeue, err := test.test(ctrl, reactor, test) if test.expectSuccess && err != nil { t.Errorf("Test %q failed: %v", test.name, err) } if !test.expectSuccess && err == nil { t.Errorf("Test %q failed: expected error, got nil", test.name) } + if !test.expectSuccess && err == nil { + t.Errorf("Test %q failed: expected error, got nil", test.name) + } + // requeue has meaning only when err == nil. A snapshot content is automatically requeued on error + if err == nil && requeue != test.expectRequeue { + t.Errorf("Test %q expected requeue %t, got %t", test.name, test.expectRequeue, requeue) + } // Wait for the target state err = reactor.waitTest(test) diff --git a/pkg/sidecar-controller/snapshot_controller.go b/pkg/sidecar-controller/snapshot_controller.go index 9d7c7aefd..02a93da05 100644 --- a/pkg/sidecar-controller/snapshot_controller.go +++ b/pkg/sidecar-controller/snapshot_controller.go @@ -51,8 +51,9 @@ import ( const controllerUpdateFailMsg = "snapshot controller failed to update" -// syncContent deals with one key off the queue. It returns false when it's time to quit. -func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnapshotContent) error { +// syncContent deals with one key off the queue. It returns flag indicating if the +// content should be requeued. On error, the content is always requeued. +func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnapshotContent) (requeue bool, err error) { klog.V(5).Infof("synchronizing VolumeSnapshotContent[%s]", content.Name) if ctrl.shouldDelete(content) { @@ -63,13 +64,22 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps // underlying storage system. Note that the deletion snapshot operation will // update content SnapshotHandle to nil upon a successful deletion. At this // point, the finalizer on content should NOT be removed to avoid leaking. - return ctrl.deleteCSISnapshot(content) + err := ctrl.deleteCSISnapshot(content) + if err != nil { + return true, err + } + return false, nil } // otherwise, either the snapshot has been deleted from the underlying // storage system, or the deletion policy is Retain, remove the finalizer // if there is one so that API server could delete the object if there is // no other finalizer. - return ctrl.removeContentFinalizer(content) + err := ctrl.removeContentFinalizer(content) + if err != nil { + return true, err + } + return false, nil + } if content.Spec.Source.VolumeHandle != nil && content.Status == nil { klog.V(5).Infof("syncContent: Call CreateSnapshot for content %s", content.Name) @@ -79,11 +89,13 @@ func (ctrl *csiSnapshotSideCarController) syncContent(content *crdv1.VolumeSnaps // already true. We don't want to keep calling CreateSnapshot // or ListSnapshots CSI methods over and over again for // performance reasons. - var err error - if content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse == true { + if contentIsReady(content) { // Try to remove AnnVolumeSnapshotBeingCreated if it is not removed yet for some reason _, err = ctrl.removeAnnVolumeSnapshotBeingCreated(content) - return err + if err != nil { + return true, err + } + return false, nil } return ctrl.checkandUpdateContentStatus(content) } @@ -98,14 +110,15 @@ func (ctrl *csiSnapshotSideCarController) storeContentUpdate(content interface{} return utils.StoreObjectUpdate(ctrl.contentStore, content, "content") } -// createSnapshot starts new asynchronous operation to create snapshot -func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) error { +// createSnapshot starts new asynchronous operation to create snapshot. It returns flag indicating if the +// content should be requeued. On error, the content is always requeued. +func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSnapshotContent) (requeue bool, err error) { klog.V(5).Infof("createSnapshot for content [%s]: started", content.Name) contentObj, err := ctrl.createSnapshotWrapper(content) if err != nil { ctrl.updateContentErrorStatusWithEvent(contentObj, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err)) klog.Errorf("createSnapshot for content [%s]: error occurred in createSnapshotWrapper: %v", content.Name, err) - return err + return true, err } _, updateErr := ctrl.storeContentUpdate(contentObj) @@ -113,24 +126,26 @@ func (ctrl *csiSnapshotSideCarController) createSnapshot(content *crdv1.VolumeSn // We will get an "snapshot update" event soon, this is not a big error klog.V(4).Infof("createSnapshot for content [%s]: cannot update internal content cache: %v", content.Name, updateErr) } - return nil + return !contentIsReady(contentObj), nil } -func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) error { +// checkandUpdateContentStatus checks status of the volume snapshot in CSI driver and updates content.status +// accordingly. It returns flag indicating if the content should be requeued. On error, the content is +// always requeued. +func (ctrl *csiSnapshotSideCarController) checkandUpdateContentStatus(content *crdv1.VolumeSnapshotContent) (requeue bool, err error) { klog.V(5).Infof("checkandUpdateContentStatus[%s] started", content.Name) contentObj, err := ctrl.checkandUpdateContentStatusOperation(content) if err != nil { ctrl.updateContentErrorStatusWithEvent(contentObj, v1.EventTypeWarning, "SnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update snapshot content: %v", err)) klog.Errorf("checkandUpdateContentStatus [%s]: error occurred %v", content.Name, err) - return err + return true, err } _, updateErr := ctrl.storeContentUpdate(contentObj) if updateErr != nil { // We will get an "snapshot update" event soon, this is not a big error klog.V(4).Infof("checkandUpdateContentStatus [%s]: cannot update internal cache: %v", content.Name, updateErr) } - - return nil + return !contentIsReady(contentObj), nil } // updateContentStatusWithEvent saves new content.Status to API server and emits @@ -380,6 +395,7 @@ func (ctrl *csiSnapshotSideCarController) deleteCSISnapshotOperation(content *cr return err } // trigger syncContent + // TODO: just enqueue the content object instead of calling syncContent directly ctrl.updateContentInInformerCache(newContent) return nil } @@ -689,3 +705,7 @@ func isCSIFinalError(err error) bool { // even start or failed. It is for sure not in progress. return true } + +func contentIsReady(content *crdv1.VolumeSnapshotContent) bool { + return content.Status != nil && content.Status.ReadyToUse != nil && *content.Status.ReadyToUse +} diff --git a/pkg/sidecar-controller/snapshot_controller_base.go b/pkg/sidecar-controller/snapshot_controller_base.go index 427d85def..44cb8fe25 100644 --- a/pkg/sidecar-controller/snapshot_controller_base.go +++ b/pkg/sidecar-controller/snapshot_controller_base.go @@ -18,9 +18,10 @@ package sidecar_controller import ( "fmt" - "github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter" "time" + "github.com/kubernetes-csi/external-snapshotter/v6/pkg/group_snapshotter" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -205,6 +206,7 @@ func (ctrl *csiSnapshotSideCarController) enqueueContentWork(obj interface{}) { return } klog.V(5).Infof("enqueued %q for sync", objName) + ctrl.contentQueue.Add(objName) } } @@ -223,11 +225,15 @@ func (ctrl *csiSnapshotSideCarController) processNextItem() bool { } defer ctrl.contentQueue.Done(keyObj) - if err := ctrl.syncContentByKey(keyObj.(string)); err != nil { - // Rather than wait for a full resync, re-add the key to the - // queue to be processed. - ctrl.contentQueue.AddRateLimited(keyObj) + requeue, err := ctrl.syncContentByKey(keyObj.(string)) + if err != nil { klog.V(4).Infof("Failed to sync content %q, will retry again: %v", keyObj.(string), err) + // Always requeue on error to be able to call functions like "return false, doSomething()" where doSomething + // does not need to worry about re-queueing. + requeue = true + } + if requeue { + ctrl.contentQueue.AddRateLimited(keyObj) return true } @@ -237,30 +243,32 @@ func (ctrl *csiSnapshotSideCarController) processNextItem() bool { return true } -func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error { +// syncContentByKey syncs a single content. It returns true if the controller should +// requeue the item again. On error, content is always requeued. +func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) (requeue bool, err error) { klog.V(5).Infof("syncContentByKey[%s]", key) _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting name of snapshotContent %q to get snapshotContent from informer: %v", key, err) - return nil + return false, nil } content, err := ctrl.contentLister.Get(name) // The content still exists in informer cache, the event must have // been add/update/sync if err == nil { if ctrl.isDriverMatch(content) { - err = ctrl.updateContentInInformerCache(content) + requeue, err = ctrl.updateContentInInformerCache(content) } if err != nil { // If error occurs we add this item back to the queue - return err + return true, err } - return nil + return requeue, nil } if !errors.IsNotFound(err) { klog.V(2).Infof("error getting content %q from informer: %v", key, err) - return nil + return false, nil } // The content is not in informer cache, the event must have been @@ -268,21 +276,21 @@ func (ctrl *csiSnapshotSideCarController) syncContentByKey(key string) error { contentObj, found, err := ctrl.contentStore.GetByKey(key) if err != nil { klog.V(2).Infof("error getting content %q from cache: %v", key, err) - return nil + return false, nil } if !found { // The controller has already processed the delete event and // deleted the content from its cache klog.V(2).Infof("deletion of content %q was already processed", key) - return nil + return false, nil } content, ok := contentObj.(*crdv1.VolumeSnapshotContent) if !ok { klog.Errorf("expected content, got %+v", content) - return nil + return false, nil } ctrl.deleteContentInCacheStore(content) - return nil + return false, nil } // isDriverMatch verifies whether the driver specified in VolumeSnapshotContent @@ -331,7 +339,7 @@ func (ctrl *csiSnapshotSideCarController) isDriverMatch(object interface{}) bool // updateContentInInformerCache runs in worker thread and handles "content added", // "content updated" and "periodic sync" events. -func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) error { +func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content *crdv1.VolumeSnapshotContent) (requeue bool, err error) { // Store the new content version in the cache and do not process it if this is // an old version. new, err := ctrl.storeContentUpdate(content) @@ -339,9 +347,9 @@ func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content * klog.Errorf("%v", err) } if !new { - return nil + return false, nil } - err = ctrl.syncContent(content) + requeue, err = ctrl.syncContent(content) if err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller @@ -350,9 +358,9 @@ func (ctrl *csiSnapshotSideCarController) updateContentInInformerCache(content * } else { klog.Errorf("could not sync content %q: %+v", content.Name, err) } - return err + return requeue, err } - return nil + return requeue, nil } // deleteContent runs in worker thread and handles "content deleted" event.