diff --git a/cmd/main.go b/cmd/main.go index 059fb79..83e91c2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -11,13 +11,13 @@ import ( csi "github.com/awslabs/volume-modifier-for-k8s/pkg/client" "github.com/awslabs/volume-modifier-for-k8s/pkg/controller" "github.com/awslabs/volume-modifier-for-k8s/pkg/modifier" - "github.com/awslabs/volume-modifier-for-k8s/pkg/util" - "github.com/kubernetes-csi/csi-lib-utils/leaderelection" "github.com/kubernetes-csi/csi-lib-utils/metrics" + v1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -64,6 +64,11 @@ func main() { } klog.Infof("Version : %s", version) + podName := os.Getenv("POD_NAME") + if podName == "" { + klog.Fatal("POD_NAME environment variable is not set") + } + addr := *httpEndpoint var config *rest.Config var err error @@ -133,39 +138,50 @@ func main() { workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax), true, /* retryFailure */ ) + leaseChannel := make(chan *v1.Lease) + go leaseHandler(podName, mc, leaseChannel) + + leaseInformer := informerFactory.Coordination().V1().Leases().Informer() + leaseInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + lease, ok := newObj.(*v1.Lease) + if !ok { + klog.ErrorS(nil, "Failed to process object, expected it to be a Lease", "obj", newObj) + return + } + if lease.Name == "external-resizer-ebs-csi-aws-com" { + leaseChannel <- lease + } + }, + }) + informerFactory.Start(wait.NeverStop) + leaseInformer.Run(wait.NeverStop) +} - run := func(ctx context.Context) { - informerFactory.Start(wait.NeverStop) - mc.Run(*workers, ctx) - } +func leaseHandler(podName string, mc controller.ModifyController, leaseChannel chan *v1.Lease) { + var cancel context.CancelFunc = nil - if !*enableLeaderElection { - run(context.TODO()) - } else { - // Ensure volume-modifier-for-k8s and external-resizer sidecars always elect the same leader - // by putting them on the same lease that is identified by the lock name. - externalResizerLockName := "external-resizer-" + util.SanitizeName(driverName) - leKubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - klog.Fatal(err.Error()) - } - le := leaderelection.NewLeaderElection(leKubeClient, externalResizerLockName, run) - if *httpEndpoint != "" { - le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) - } - - if *leaderElectionNamespace != "" { - le.WithNamespace(*leaderElectionNamespace) - } + for lease := range leaseChannel { + currentLeader := *lease.Spec.HolderIdentity - le.WithLeaseDuration(*leaderElectionLeaseDuration) - le.WithRenewDeadline(*leaderElectionRenewDeadline) - le.WithRetryPeriod(*leaderElectionRetryPeriod) + klog.V(6).InfoS("leaseHandler: Lease updated", "currentLeader", currentLeader, "podName", podName) - if err := le.Run(); err != nil { - klog.Fatalf("error initializing leader election: %v", err) + if currentLeader == podName && cancel == nil { + var ctx context.Context + ctx, cancel = context.WithCancel(context.Background()) + klog.InfoS("leaseHandler: Starting ModifyController", "podName", podName, "currentLeader", currentLeader) + go mc.Run(*workers, ctx) + } else if currentLeader != podName && cancel != nil { + klog.InfoS("leaseHandler: Stopping ModifyController", "podName", podName, "currentLeader", currentLeader) + cancel() + cancel = nil } } + + // Ensure cancel is called if it's not nil when we exit the function + if cancel != nil { + cancel() + } } func getDriverName(client csi.Client, timeout time.Duration) (string, error) { diff --git a/cmd/main_test.go b/cmd/main_test.go new file mode 100644 index 0000000..2e81ded --- /dev/null +++ b/cmd/main_test.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/awslabs/volume-modifier-for-k8s/pkg/controller" + + v1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/golang/mock/gomock" +) + +func TestLeaseHandler_PodIsLeader(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockModifyController := controller.NewMockModifyController(ctrl) + + signalChannel := make(chan struct{}, 1) + mockModifyController.EXPECT().Run(gomock.Any(), gomock.Any()).Do( + func(_ int, _ context.Context) { + t.Log("Run was called") + signalChannel <- struct{}{} + }, + ).Times(1) + + lease := newLease("external-resizer-ebs-csi-aws-com", "test-pod") + runLeaseHandlerAndSendLease(t, "test-pod", mockModifyController, lease) + + select { + case <-signalChannel: + t.Log("Signal received, test passed") + case <-time.After(time.Second): + t.Fatal("Timeout waiting for Run to be called") + } +} + +func TestLeaseHandler_PodIsNotLeader(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockModifyController := controller.NewMockModifyController(ctrl) + mockModifyController.EXPECT().Run(gomock.Any(), gomock.Any()).Times(0) + + lease := newLease("external-resizer-ebs-csi-aws-com", "other-pod") + runLeaseHandlerAndSendLease(t, "test-pod", mockModifyController, lease) +} + +func TestLeaseHandler_PodRegainsLeadership(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockModifyController := controller.NewMockModifyController(ctrl) + + firstSignalChannel := make(chan struct{}, 1) + secondSignalChannel := make(chan struct{}, 1) + + // Expect Run to be called once initially, signal via first channel + mockModifyController.EXPECT().Run(gomock.Any(), gomock.Any()).Do( + func(_ int, _ context.Context) { + t.Log("First Run was called") + firstSignalChannel <- struct{}{} + }, + ).Times(1) + + // Expect Run to be called again after leadership loss, signal via second channel + mockModifyController.EXPECT().Run(gomock.Any(), gomock.Any()).Do( + func(_ int, _ context.Context) { + t.Log("Second Run was called") + secondSignalChannel <- struct{}{} + }, + ).Times(1) + + leaseChannel := make(chan *v1.Lease, 3) + defer close(leaseChannel) + + go leaseHandler("test-pod", mockModifyController, leaseChannel) + + // Become the leader + leaseChannel <- newLease("external-resizer-ebs-csi-aws-com", "test-pod") + <-firstSignalChannel + // Lose the leadership + leaseChannel <- newLease("external-resizer-ebs-csi-aws-com", "other-pod") + // Regain the leadership + leaseChannel <- newLease("external-resizer-ebs-csi-aws-com", "test-pod") + + select { + case <-secondSignalChannel: + t.Log("Test passed, second Run was called") + case <-time.After(time.Second): + t.Fatal("Timeout waiting for Run to be called") + } +} + +func TestLeaseHandler_RunCalledOnce(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockModifyController := controller.NewMockModifyController(ctrl) + + signalChannel := make(chan struct{}, 1) + + mockModifyController.EXPECT().Run(gomock.Any(), gomock.Any()).Do( + func(_ int, _ context.Context) { + t.Log("Run was called") + signalChannel <- struct{}{} + }, + ).Times(1) + + leaseChannel := make(chan *v1.Lease, 3) + defer close(leaseChannel) + + go leaseHandler("test-pod", mockModifyController, leaseChannel) + + for i := 0; i < 10; i++ { + leaseChannel <- newLease("external-resizer-ebs-csi-aws-com", "test-pod") + } + + select { + case <-signalChannel: + t.Log("Signal received, test passed") + case <-time.After(time.Second): + t.Fatal("Timeout waiting for Run to be called") + } +} + +func newLease(name, holderIdentity string) *v1.Lease { + return &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: &holderIdentity, + }, + } +} + +func runLeaseHandlerAndSendLease(t *testing.T, podName string, mockModifyController *controller.MockModifyController, lease *v1.Lease) { + leaseChannel := make(chan *v1.Lease, 1) + go leaseHandler(podName, mockModifyController, leaseChannel) + leaseChannel <- lease + close(leaseChannel) +} diff --git a/go.mod b/go.mod index 3896573..6b86df2 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/go-openapi/swag v0.22.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/mock v1.6.0 github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect diff --git a/go.sum b/go.sum index 83129ea..052785e 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,8 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -264,6 +266,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -305,6 +308,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -334,6 +338,7 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -358,6 +363,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -392,7 +398,9 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -459,6 +467,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/controller/mock_controller.go b/pkg/controller/mock_controller.go new file mode 100644 index 0000000..e845a19 --- /dev/null +++ b/pkg/controller/mock_controller.go @@ -0,0 +1,47 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: controller.go + +// Package controller is a generated GoMock package. +package controller + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockModifyController is a mock of ModifyController interface. +type MockModifyController struct { + ctrl *gomock.Controller + recorder *MockModifyControllerMockRecorder +} + +// MockModifyControllerMockRecorder is the mock recorder for MockModifyController. +type MockModifyControllerMockRecorder struct { + mock *MockModifyController +} + +// NewMockModifyController creates a new mock instance. +func NewMockModifyController(ctrl *gomock.Controller) *MockModifyController { + mock := &MockModifyController{ctrl: ctrl} + mock.recorder = &MockModifyControllerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockModifyController) EXPECT() *MockModifyControllerMockRecorder { + return m.recorder +} + +// Run mocks base method. +func (m *MockModifyController) Run(arg0 int, arg1 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run", arg0, arg1) +} + +// Run indicates an expected call of Run. +func (mr *MockModifyControllerMockRecorder) Run(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockModifyController)(nil).Run), arg0, arg1) +}