Skip to content

Commit

Permalink
Leader election conflict with csi-resizer bug fix
Browse files Browse the repository at this point in the history
Signed-off-by: torredil <[email protected]>
  • Loading branch information
torredil committed Oct 3, 2023
1 parent 69e91a9 commit a5b11ed
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 32 deletions.
82 changes: 50 additions & 32 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,32 @@ import (
"fmt"
"net/http"
"os"
"sync/atomic"
"time"

csi "github.com/awslabs/volume-modifier-for-k8s/pkg/client"
"github.com/awslabs/volume-modifier-for-k8s/pkg/controller"
"github.com/awslabs/volume-modifier-for-k8s/pkg/modifier"
"github.com/awslabs/volume-modifier-for-k8s/pkg/util"
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/kubernetes-csi/csi-lib-utils/metrics"
v1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

var (
controllerStatus atomic.Value
)

func init() {
controllerStatus.Store(false)
}

var (
clientConfigUrl = flag.String("client-config-url", "", "URL to build a client config from. Either this or kubeconfig needs to be set if the provisioner is being run out of cluster.")
kubeConfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig")
Expand Down Expand Up @@ -64,6 +73,11 @@ func main() {
}
klog.Infof("Version : %s", version)

podName := os.Getenv("POD_NAME")
if podName == "" {
klog.Fatal("POD_NAME environment variable is not set")
}

addr := *httpEndpoint
var config *rest.Config
var err error
Expand Down Expand Up @@ -134,37 +148,41 @@ func main() {
true, /* retryFailure */
)

run := func(ctx context.Context) {
informerFactory.Start(wait.NeverStop)
mc.Run(*workers, ctx)
}
runContext, cancel := context.WithCancel(context.Background())
leaseInformer := informerFactory.Coordination().V1().Leases().Informer()

if !*enableLeaderElection {
run(context.TODO())
} else {
// Ensure volume-modifier-for-k8s and external-resizer sidecars always elect the same leader
// by putting them on the same lease that is identified by the lock name.
externalResizerLockName := "external-resizer-" + util.SanitizeName(driverName)
leKubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err.Error())
}
le := leaderelection.NewLeaderElection(leKubeClient, externalResizerLockName, run)
if *httpEndpoint != "" {
le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout)
}

if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace)
}

le.WithLeaseDuration(*leaderElectionLeaseDuration)
le.WithRenewDeadline(*leaderElectionRenewDeadline)
le.WithRetryPeriod(*leaderElectionRetryPeriod)

if err := le.Run(); err != nil {
klog.Fatalf("error initializing leader election: %v", err)
}
leaseInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
lease, ok := newObj.(*v1.Lease)
if !ok {
klog.ErrorS(nil, "Failed to process object, expected it to be a Lease", "obj", newObj)
return
}
handleLeaseUpdate(lease, podName, runContext, cancel, mc)
},
})
leaseInformer.Run(wait.NeverStop)
}

func handleLeaseUpdate(lease *v1.Lease, podName string, runContext context.Context, cancel context.CancelFunc, mc controller.ModifyController) {
// If the updated Lease is not relevant, return early
if lease.Name != "external-resizer-ebs-csi-aws-com" {
return
}
// Extract the current leader from the Lease
currentLeader := *lease.Spec.HolderIdentity

klog.V(6).InfoS("Controller status", "podName", podName, "currentLeader", currentLeader, "controllerStatus", controllerStatus.Load().(bool))

if currentLeader == podName && !controllerStatus.Load().(bool) {
// If the current leader is the current pod, and the controller is not running, start it
klog.InfoS("Starting controller", "podName", podName, "currentLeader", currentLeader)
controllerStatus.Store(true)
go mc.Run(*workers, runContext)
} else if currentLeader != podName && controllerStatus.Load().(bool) {
// If the current leader is not the current pod, and the controller is running, stop it
klog.InfoS("Stopping controller", "podName", podName, "currentLeader", currentLeader)
cancel()
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -65,6 +66,7 @@ func NewModifyController(
DeleteFunc: ctrl.deletePVC,
}, resyncPeriod)

informerFactory.Start(wait.NeverStop)
return ctrl
}

Expand Down Expand Up @@ -106,6 +108,7 @@ func (c *modifyController) Run(workers int, ctx context.Context) {
}

<-stopCh
os.Exit(0)
}

func (c *modifyController) addPVC(obj interface{}) {
Expand Down

0 comments on commit a5b11ed

Please sign in to comment.