diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index 58680f3d4..c184c97fd 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -53,6 +53,7 @@ var ( csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") dummy = flag.Bool("dummy", false, "Run in dummy mode, i.e. not connecting to CSI driver and marking everything as attached. Expected CSI driver name is \"csi/dummy\".") showVersion = flag.Bool("version", false, "Show version.") + timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.") leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where this attacher runs.") @@ -138,7 +139,7 @@ func main() { pvLister := factory.Core().V1().PersistentVolumes().Lister() nodeLister := factory.Core().V1().Nodes().Lister() vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister() - handler = controller.NewCSIHandler(clientset, attacher, csiConn, pvLister, nodeLister, vaLister) + handler = controller.NewCSIHandler(clientset, attacher, csiConn, pvLister, nodeLister, vaLister, timeout) glog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler") } else { handler = controller.NewTrivialHandler(clientset) diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go index 351b7da7e..bb8c9d7c4 100644 --- a/pkg/controller/csi_handler.go +++ b/pkg/controller/csi_handler.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "time" "github.com/golang/glog" @@ -45,17 +46,20 @@ type csiHandler struct { nodeLister corelisters.NodeLister vaLister storagelisters.VolumeAttachmentLister vaQueue, pvQueue workqueue.RateLimitingInterface + timeout time.Duration } var _ Handler = &csiHandler{} +// NewCSIHandler creates a new CSIHandler. func NewCSIHandler( client kubernetes.Interface, attacherName string, csiConnection connection.CSIConnection, pvLister corelisters.PersistentVolumeLister, nodeLister corelisters.NodeLister, - vaLister storagelisters.VolumeAttachmentLister) Handler { + vaLister storagelisters.VolumeAttachmentLister, + timeout *time.Duration) Handler { return &csiHandler{ client: client, @@ -64,6 +68,7 @@ func NewCSIHandler( pvLister: pvLister, nodeLister: nodeLister, vaLister: vaLister, + timeout: *timeout, } } @@ -258,7 +263,8 @@ func (h *csiHandler) csiAttach(va *storage.VolumeAttachment) (*storage.VolumeAtt return va, nil, fmt.Errorf("could not add VolumeAttachment finalizer: %s", err) } - ctx := context.TODO() + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() // We're not interested in `detached` return value, the controller will // issue Detach to be sure the volume is really detached. publishInfo, _, err := h.csiConnection.Attach(ctx, volumeHandle, readOnly, nodeID, volumeCapabilities, attributes, secrets) @@ -296,7 +302,8 @@ func (h *csiHandler) csiDetach(va *storage.VolumeAttachment) (*storage.VolumeAtt return va, err } - ctx := context.TODO() + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() detached, err := h.csiConnection.Detach(ctx, volumeHandle, nodeID, secrets) if err != nil && !detached { // The volume may not be fully detached. Save the error and try again diff --git a/pkg/controller/csi_handler_test.go b/pkg/controller/csi_handler_test.go index 455cf2f00..032dd7e41 100644 --- a/pkg/controller/csi_handler_test.go +++ b/pkg/controller/csi_handler_test.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "testing" + "time" "github.com/kubernetes-csi/external-attacher/pkg/connection" @@ -39,6 +40,8 @@ const ( fin = "external-attacher/csi-test" ) +var timeout = 10 * time.Millisecond + func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi connection.CSIConnection) Handler { return NewCSIHandler( client, @@ -46,7 +49,9 @@ func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.Sh csi, informerFactory.Core().V1().PersistentVolumes().Lister(), informerFactory.Core().V1().Nodes().Lister(), - informerFactory.Storage().V1beta1().VolumeAttachments().Lister()) + informerFactory.Storage().V1beta1().VolumeAttachments().Lister(), + &timeout, + ) } func pv() *v1.PersistentVolume { @@ -168,7 +173,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, }, }, { @@ -181,7 +186,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, }, }, { @@ -194,7 +199,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, map[string]string{"foo": "bar"}, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, map[string]string{"foo": "bar"}, noSecrets, success, notDetached, noMetadata, 0}, }, }, { @@ -208,7 +213,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, notDetached, noMetadata, 0}, }, }, { @@ -222,7 +227,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, notDetached, noMetadata, 0}, }, }, { @@ -247,7 +252,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, }, }, { @@ -285,7 +290,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, }, }, { @@ -314,7 +319,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithMetadata(va(true, fin), map[string]string{"foo": "bar"})), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, map[string]string{"foo": "bar"}}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, map[string]string{"foo": "bar"}, 0}, }, }, { @@ -407,7 +412,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, }, }, { @@ -438,8 +443,8 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, }, }, { @@ -453,8 +458,22 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), }, expectedCSICalls: []csiCall{ - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata}, - {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata, 0}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0}, + }, + }, + { + name: "CSI attach times out -> controller retries", + initialObjects: []runtime.Object{pvWithFinalizer(), node()}, + addedVA: va(false, ""), + expectedActions: []core.Action{ + // Finalizer is saved first + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, fin)), + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)), + }, + expectedCSICalls: []csiCall{ + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 500 * time.Millisecond}, + {"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, time.Duration(0)}, }, }, // @@ -468,7 +487,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), }, expectedCSICalls: []csiCall{ - {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata}, + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0}, }, }, { @@ -480,7 +499,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), }, expectedCSICalls: []csiCall{ - {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, detached, noMetadata}, + {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, detached, noMetadata, 0}, }, }, { @@ -492,7 +511,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), }, expectedCSICalls: []csiCall{ - {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, detached, noMetadata}, + {"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, detached, noMetadata, 0}, }, }, { @@ -514,8 +533,20 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), }, expectedCSICalls: []csiCall{ - {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata}, - {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata}, + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata, 0}, + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0}, + }, + }, + { + name: "CSI detach times out -> controller retries", + initialObjects: []runtime.Object{pvWithFinalizer(), node()}, + addedVA: deleted(va(true, fin)), + expectedActions: []core.Action{ + core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), + }, + expectedCSICalls: []csiCall{ + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 500 * time.Millisecond}, + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, time.Duration(0)}, }, }, { @@ -526,7 +557,7 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))), }, expectedCSICalls: []csiCall{ - {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), detached, noMetadata}, + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), detached, noMetadata, 0}, }, }, { @@ -617,8 +648,8 @@ func TestCSIHandler(t *testing.T) { core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false, ""))), }, expectedCSICalls: []csiCall{ - {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata}, - {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata}, + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0}, + {"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0}, }, }, diff --git a/pkg/controller/framework_test.go b/pkg/controller/framework_test.go index fa78b9309..6f18137b3 100644 --- a/pkg/controller/framework_test.go +++ b/pkg/controller/framework_test.go @@ -92,6 +92,8 @@ type csiCall struct { detached bool // metadata to return (used only in Attach calls) metadata map[string]string + // Force the attach or detach to take a certain amount of time + delay time.Duration } type handlerFactory func(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi connection.CSIConnection) Handler @@ -341,9 +343,15 @@ func (f *fakeCSIConnection) Attach(ctx context.Context, volumeID string, readOnl f.t.Errorf("Unexpected CSI Attach call: volume=%s, node=%s, index: %d, calls: %+v", volumeID, nodeID, f.index, f.calls) return nil, true, fmt.Errorf("unexpected call") } + call := f.calls[f.index] f.index++ + // Force a delay + if call.delay != time.Duration(0) { + time.Sleep(call.delay) + } + var err error if call.functionName != "attach" { f.t.Errorf("Unexpected CSI Attach call: volume=%s, node=%s, expected: %s", volumeID, nodeID, call.functionName) @@ -382,6 +390,11 @@ func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID call := f.calls[f.index] f.index++ + // Force a delay + if call.delay != time.Duration(0) { + time.Sleep(call.delay) + } + var err error if call.functionName != "detach" { f.t.Errorf("Unexpected CSI Detach call: volume=%s, node=%s, expected: %s", volumeID, nodeID, call.functionName)