Skip to content

Commit

Permalink
Merge pull request #62 from lpabon/pr52
Browse files Browse the repository at this point in the history
Add a timeout to attach and detach calls
  • Loading branch information
jsafrane authored Aug 2, 2018
2 parents d82cc93 + c7cdcfc commit a4398f8
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 26 deletions.
3 changes: 2 additions & 1 deletion cmd/csi-attacher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
dummy = flag.Bool("dummy", false, "Run in dummy mode, i.e. not connecting to CSI driver and marking everything as attached. Expected CSI driver name is \"csi/dummy\".")
showVersion = flag.Bool("version", false, "Show version.")
timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.")

enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where this attacher runs.")
Expand Down Expand Up @@ -138,7 +139,7 @@ func main() {
pvLister := factory.Core().V1().PersistentVolumes().Lister()
nodeLister := factory.Core().V1().Nodes().Lister()
vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister()
handler = controller.NewCSIHandler(clientset, attacher, csiConn, pvLister, nodeLister, vaLister)
handler = controller.NewCSIHandler(clientset, attacher, csiConn, pvLister, nodeLister, vaLister, timeout)
glog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
} else {
handler = controller.NewTrivialHandler(clientset)
Expand Down
13 changes: 10 additions & 3 deletions pkg/controller/csi_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/golang/glog"

Expand All @@ -45,17 +46,20 @@ type csiHandler struct {
nodeLister corelisters.NodeLister
vaLister storagelisters.VolumeAttachmentLister
vaQueue, pvQueue workqueue.RateLimitingInterface
timeout time.Duration
}

var _ Handler = &csiHandler{}

// NewCSIHandler creates a new CSIHandler.
func NewCSIHandler(
client kubernetes.Interface,
attacherName string,
csiConnection connection.CSIConnection,
pvLister corelisters.PersistentVolumeLister,
nodeLister corelisters.NodeLister,
vaLister storagelisters.VolumeAttachmentLister) Handler {
vaLister storagelisters.VolumeAttachmentLister,
timeout *time.Duration) Handler {

return &csiHandler{
client: client,
Expand All @@ -64,6 +68,7 @@ func NewCSIHandler(
pvLister: pvLister,
nodeLister: nodeLister,
vaLister: vaLister,
timeout: *timeout,
}
}

Expand Down Expand Up @@ -258,7 +263,8 @@ func (h *csiHandler) csiAttach(va *storage.VolumeAttachment) (*storage.VolumeAtt
return va, nil, fmt.Errorf("could not add VolumeAttachment finalizer: %s", err)
}

ctx := context.TODO()
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
// We're not interested in `detached` return value, the controller will
// issue Detach to be sure the volume is really detached.
publishInfo, _, err := h.csiConnection.Attach(ctx, volumeHandle, readOnly, nodeID, volumeCapabilities, attributes, secrets)
Expand Down Expand Up @@ -296,7 +302,8 @@ func (h *csiHandler) csiDetach(va *storage.VolumeAttachment) (*storage.VolumeAtt
return va, err
}

ctx := context.TODO()
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
detached, err := h.csiConnection.Detach(ctx, volumeHandle, nodeID, secrets)
if err != nil && !detached {
// The volume may not be fully detached. Save the error and try again
Expand Down
75 changes: 53 additions & 22 deletions pkg/controller/csi_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/kubernetes-csi/external-attacher/pkg/connection"

Expand All @@ -39,14 +40,18 @@ const (
fin = "external-attacher/csi-test"
)

var timeout = 10 * time.Millisecond

func csiHandlerFactory(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi connection.CSIConnection) Handler {
return NewCSIHandler(
client,
testAttacherName,
csi,
informerFactory.Core().V1().PersistentVolumes().Lister(),
informerFactory.Core().V1().Nodes().Lister(),
informerFactory.Storage().V1beta1().VolumeAttachments().Lister())
informerFactory.Storage().V1beta1().VolumeAttachments().Lister(),
&timeout,
)
}

func pv() *v1.PersistentVolume {
Expand Down Expand Up @@ -168,7 +173,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
Expand All @@ -181,7 +186,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
Expand All @@ -194,7 +199,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, map[string]string{"foo": "bar"}, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, map[string]string{"foo": "bar"}, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
Expand All @@ -208,7 +213,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, notDetached, noMetadata, 0},
},
},
{
Expand All @@ -222,7 +227,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, notDetached, noMetadata, 0},
},
},
{
Expand All @@ -247,7 +252,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
Expand Down Expand Up @@ -285,7 +290,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
Expand Down Expand Up @@ -314,7 +319,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, vaWithMetadata(va(true, fin), map[string]string{"foo": "bar"})),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, map[string]string{"foo": "bar"}},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, map[string]string{"foo": "bar"}, 0},
},
},
{
Expand Down Expand Up @@ -407,7 +412,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
Expand Down Expand Up @@ -438,8 +443,8 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
Expand All @@ -453,8 +458,22 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata, 0},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 0},
},
},
{
name: "CSI attach times out -> controller retries",
initialObjects: []runtime.Object{pvWithFinalizer(), node()},
addedVA: va(false, ""),
expectedActions: []core.Action{
// Finalizer is saved first
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(false /*attached*/, fin)),
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, va(true /*attached*/, fin)),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, 500 * time.Millisecond},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, notDetached, noMetadata, time.Duration(0)},
},
},
//
Expand All @@ -468,7 +487,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0},
},
},
{
Expand All @@ -480,7 +499,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, detached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{"foo": "bar"}, success, detached, noMetadata, 0},
},
},
{
Expand All @@ -492,7 +511,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, detached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, map[string]string{}, success, detached, noMetadata, 0},
},
},
{
Expand All @@ -514,8 +533,20 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), notDetached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0},
},
},
{
name: "CSI detach times out -> controller retries",
initialObjects: []runtime.Object{pvWithFinalizer(), node()},
addedVA: deleted(va(true, fin)),
expectedActions: []core.Action{
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 500 * time.Millisecond},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, time.Duration(0)},
},
},
{
Expand All @@ -526,7 +557,7 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false /*attached*/, ""))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), detached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, fmt.Errorf("mock error"), detached, noMetadata, 0},
},
},
{
Expand Down Expand Up @@ -617,8 +648,8 @@ func TestCSIHandler(t *testing.T) {
core.NewUpdateAction(vaGroupResourceVersion, metav1.NamespaceNone, deleted(va(false, ""))),
},
expectedCSICalls: []csiCall{
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0},
{"detach", testVolumeHandle, testNodeID, noAttrs, noSecrets, success, detached, noMetadata, 0},
},
},

Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type csiCall struct {
detached bool
// metadata to return (used only in Attach calls)
metadata map[string]string
// Force the attach or detach to take a certain amount of time
delay time.Duration
}

type handlerFactory func(client kubernetes.Interface, informerFactory informers.SharedInformerFactory, csi connection.CSIConnection) Handler
Expand Down Expand Up @@ -341,9 +343,15 @@ func (f *fakeCSIConnection) Attach(ctx context.Context, volumeID string, readOnl
f.t.Errorf("Unexpected CSI Attach call: volume=%s, node=%s, index: %d, calls: %+v", volumeID, nodeID, f.index, f.calls)
return nil, true, fmt.Errorf("unexpected call")
}

call := f.calls[f.index]
f.index++

// Force a delay
if call.delay != time.Duration(0) {
time.Sleep(call.delay)
}

var err error
if call.functionName != "attach" {
f.t.Errorf("Unexpected CSI Attach call: volume=%s, node=%s, expected: %s", volumeID, nodeID, call.functionName)
Expand Down Expand Up @@ -382,6 +390,11 @@ func (f *fakeCSIConnection) Detach(ctx context.Context, volumeID string, nodeID
call := f.calls[f.index]
f.index++

// Force a delay
if call.delay != time.Duration(0) {
time.Sleep(call.delay)
}

var err error
if call.functionName != "detach" {
f.t.Errorf("Unexpected CSI Detach call: volume=%s, node=%s, expected: %s", volumeID, nodeID, call.functionName)
Expand Down

0 comments on commit a4398f8

Please sign in to comment.