diff --git a/lib/controller/controller.go b/lib/controller/controller.go index 24855622bbc..496f13f6231 100644 --- a/lib/controller/controller.go +++ b/lib/controller/controller.go @@ -25,7 +25,6 @@ import ( "time" "github.com/golang/glog" - "github.com/kubernetes-incubator/external-storage/lib/leaderelection" rl "github.com/kubernetes-incubator/external-storage/lib/leaderelection/resourcelock" "k8s.io/api/core/v1" storage "k8s.io/api/storage/v1" @@ -126,10 +125,6 @@ type ProvisionController struct { // when multiple controllers are running: they race to lock (lead) every PVC // so that only one calls Provision for it (saving API calls, CPU cycles...) leaseDuration, renewDeadline, retryPeriod, termLimit time.Duration - // Map of claim UID to LeaderElector: for checking if this controller - // is the leader of a given claim - leaderElectors map[types.UID]*leaderelection.LeaderElector - leaderElectorsMutex *sync.Mutex hasRun bool hasRunLock *sync.Mutex @@ -360,8 +355,6 @@ func NewProvisionController( renewDeadline: DefaultRenewDeadline, retryPeriod: DefaultRetryPeriod, termLimit: DefaultTermLimit, - leaderElectors: make(map[types.UID]*leaderelection.LeaderElector), - leaderElectorsMutex: &sync.Mutex{}, hasRun: false, hasRunLock: &sync.Mutex{}, } @@ -530,23 +523,12 @@ func (ctrl *ProvisionController) addClaim(obj interface{}) { } if ctrl.shouldProvision(claim) { - ctrl.leaderElectorsMutex.Lock() - le, ok := ctrl.leaderElectors[claim.UID] - ctrl.leaderElectorsMutex.Unlock() - if ok && le.IsLeader() { - opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - ctrl.scheduleOperation(opName, func() error { - err := ctrl.provisionClaimOperation(claim) - ctrl.updateProvisionStats(claim, err) - return err - }) - } else { - opName := fmt.Sprintf("lock-provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - ctrl.scheduleOperation(opName, func() error { - ctrl.lockProvisionClaimOperation(claim) - return nil - }) - } + opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) + ctrl.scheduleOperation(opName, func() error { + err := ctrl.provisionClaimOperation(claim) + ctrl.updateProvisionStats(claim, err) + return err + }) } } @@ -716,75 +698,6 @@ func (ctrl *ProvisionController) shouldDelete(volume *v1.PersistentVolume) bool return true } -// lockProvisionClaimOperation wraps provisionClaimOperation. In case other -// controllers are serving the same claims, to prevent them all from creating -// volumes for a claim & racing to submit their PV, each controller creates a -// LeaderElector to instead race for the leadership (lock), where only the -// leader is tasked with provisioning & may try to do so -func (ctrl *ProvisionController) lockProvisionClaimOperation(claim *v1.PersistentVolumeClaim) { - stoppedLeading := false - rl := rl.ProvisionPVCLock{ - PVCMeta: claim.ObjectMeta, - Client: ctrl.client, - LockConfig: rl.Config{ - Identity: string(ctrl.identity), - EventRecorder: ctrl.eventRecorder, - }, - } - le, err := leaderelection.NewLeaderElector(leaderelection.Config{ - Lock: &rl, - LeaseDuration: ctrl.leaseDuration, - RenewDeadline: ctrl.renewDeadline, - RetryPeriod: ctrl.retryPeriod, - TermLimit: ctrl.termLimit, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(_ <-chan struct{}) { - opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) - ctrl.scheduleOperation(opName, func() error { - err := ctrl.provisionClaimOperation(claim) - ctrl.updateProvisionStats(claim, err) - return err - }) - }, - OnStoppedLeading: func() { - stoppedLeading = true - }, - }, - }) - if err != nil { - glog.Errorf("Error creating LeaderElector, can't provision for claim %q: %v", claimToClaimKey(claim), err) - return - } - - ctrl.leaderElectorsMutex.Lock() - ctrl.leaderElectors[claim.UID] = le - ctrl.leaderElectorsMutex.Unlock() - - // To determine when to stop trying to acquire/renew the lock, watch for - // provisioning success/failure. (The leader could get the result of its - // operation but it has to watch anyway) - stopCh := make(chan struct{}) - successCh, err := ctrl.watchProvisioning(claim, stopCh) - if err != nil { - glog.Errorf("Error watching for provisioning success, can't provision for claim %q: %v", claimToClaimKey(claim), err) - } - - le.Run(successCh) - - close(stopCh) - - // If we were the leader and stopped, give others a chance to acquire - // (whether they exist & want to or not). Else, there must have been a - // success so just proceed. - if stoppedLeading { - time.Sleep(ctrl.leaseDuration + ctrl.retryPeriod) - } - - ctrl.leaderElectorsMutex.Lock() - delete(ctrl.leaderElectors, claim.UID) - ctrl.leaderElectorsMutex.Unlock() -} - func (ctrl *ProvisionController) updateProvisionStats(claim *v1.PersistentVolumeClaim, err error) { ctrl.failedProvisionStatsMutex.Lock() defer ctrl.failedProvisionStatsMutex.Unlock() @@ -838,9 +751,13 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol // A previous doProvisionClaim may just have finished while we were waiting for // the locks. Check that PV (with deterministic name) hasn't been provisioned // yet. + nameSpace := claim.GetNamespace() pvName := ctrl.getProvisionedVolumeNameForClaim(claim) - volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}) - if err == nil && volume != nil { + _, exists, err := ctrl.volumes.GetByKey(fmt.Sprintf("%s/%s", nameSpace, pvName)) + if err != nil { + glog.Errorf("Error getting claim %q's volume: %v", claimToClaimKey(claim), err) + return nil + } else if exists { // Volume has been already provisioned, nothing to do. glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim)) return nil @@ -884,7 +801,7 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim))) - volume, err = ctrl.provisioner.Provision(options) + volume, err := ctrl.provisioner.Provision(options) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Provision ignored, do nothing and hope another provisioner will provision it. @@ -1118,10 +1035,25 @@ func (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolu // Our check does not have to be as sophisticated as PV controller's, we can // trust that the PV controller has set the PV to Released/Failed and it's // ours to delete - newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) + newVolumeObject, exists, err := ctrl.volumes.GetByKey(volume.Name) if err != nil { + glog.Errorf("error getting persistentvolume by name %s: %v, skipping", volume.Name, err) + return nil + } else if !exists { + glog.Infof("persistentvolume %s does not exist, skipping", volume.Name) return nil } + + if err != nil || !exists { + return nil + } + + newVolume, ok := newVolumeObject.(*v1.PersistentVolume) + if !ok { + glog.Errorf("error getting persistentvolume %s/%s, skipping", volume.Namespace, volume.Name) + return nil + } + if !ctrl.shouldDelete(newVolume) { glog.Infof("volume %q no longer needs deletion, skipping", volume.Name) return nil