Skip to content

Commit

Permalink
sidecar: use leader election when CONTROLLER_SERVICE is supported
Browse files Browse the repository at this point in the history
When a CSI-driver provides the CONTROLLER_SERVICE capability, the
sidecar will try to become the leader by obtaining a Lease based on the
name of the CSI-driver.

Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Dec 8, 2023
1 parent 1092b85 commit 661910e
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 3 deletions.
29 changes: 29 additions & 0 deletions sidecar/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Client interface {
GetGRPCClient() *grpc.ClientConn
Probe() error
GetDriverName() (string, error)
HasControllerService() (bool, error)
}

// clientImpl holds the GRPC connenction details
Expand Down Expand Up @@ -123,6 +124,34 @@ func (c *clientImpl) probeOnce() (bool, error) {
return r.GetValue(), nil
}

// HasControllerService gets the driver name from the driver
func (c *clientImpl) HasControllerService() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()

identityClient := identity.NewIdentityClient(c.client)

req := identity.GetCapabilitiesRequest{}
rsp, err := identityClient.GetCapabilities(ctx, &req)
if err != nil {
return false, err
}

caps := rsp.GetCapabilities()
if len(caps) == 0 {
return false, errors.New("driver does not have any capabilities")
}

for _, c := range caps {
svc := c.GetService()
if svc != nil && svc.GetType() == identity.Capability_Service_CONTROLLER_SERVICE {
return true, nil
}
}

return false, nil
}

// GetDriverName gets the driver name from the driver
func (c *clientImpl) GetDriverName() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
Expand Down
8 changes: 8 additions & 0 deletions sidecar/internal/csiaddonsnode/csiaddonsnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
type mockClient struct {
// Driver contains the drivername obtained with GetDriverName()
driver string

// isController is set to true when the CSI-plugin supports the
// CONTROLLER_SERVICE
isController bool
}

func NewMockClient(driver string) client.Client {
Expand All @@ -50,6 +54,10 @@ func (mc *mockClient) GetDriverName() (string, error) {
return mc.driver, nil
}

func (mc *mockClient) HasControllerService() (bool, error) {
return mc.isController, nil
}

func Test_getCSIAddonsNode(t *testing.T) {
var (
podName = "pod"
Expand Down
58 changes: 55 additions & 3 deletions sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ limitations under the License.
package main

import (
"context"
"flag"
"time"

"github.com/csi-addons/kubernetes-csi-addons/internal/sidecar/service"
"github.com/csi-addons/kubernetes-csi-addons/internal/util"
"github.com/csi-addons/kubernetes-csi-addons/internal/version"
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/client"
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/csiaddonsnode"
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/server"
"github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/util"
sideutil "github.com/csi-addons/kubernetes-csi-addons/sidecar/internal/util"

"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
Expand All @@ -48,6 +51,11 @@ func main() {
podNamespace = flag.String("namespace", "", "namespace of the Pod that contains this sidecar")
podUID = flag.String("pod-uid", "", "UID of the Pod that contains this sidecar")
showVersion = flag.Bool("version", false, "Print Version details")

leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.")
)
klog.InitFlags(nil)

Expand All @@ -62,7 +70,7 @@ func main() {
return
}

controllerEndpoint, err := util.BuildEndpointURL(*controllerIP, *controllerPort, *podName, *podNamespace)
controllerEndpoint, err := sideutil.BuildEndpointURL(*controllerIP, *controllerPort, *podName, *podNamespace)
if err != nil {
klog.Fatalf("Failed to validate controller endpoint: %v", err)
}
Expand Down Expand Up @@ -107,5 +115,49 @@ func main() {
sidecarServer.RegisterService(service.NewNetworkFenceServer(csiClient.GetGRPCClient(), kubeClient))
sidecarServer.RegisterService(service.NewReplicationServer(csiClient.GetGRPCClient(), kubeClient))

sidecarServer.Start()
isController, err := csiClient.HasControllerService()
if err != nil {
klog.Fatalf("Failed to check if the CSI-plugin supports CONTROLLER_SERVICE: %v", err)
}

// do not use leaderelection when the CSI-plugin does not have
// CONTROLLER_SERVICE
if !isController {
klog.Info("The CSI-plugin does not have the CSI-Addons CONTROLLER_SERVICE capability, not running leader election")
sidecarServer.Start()
} else {
// start the server in a go-routine so that the controller can
// connect to it, even if this service is not the leaser
go sidecarServer.Start()

driver, err := csiClient.GetDriverName()
if err != nil {
klog.Fatalf("Failed to get the drivername from the CSI-plugin: %v", err)
}

leaseName := util.NormalizeLeaseName(driver) + "-csi-addons"
le := leaderelection.NewLeaderElection(kubeClient, leaseName, func(context.Context) {
klog.Info("Yay! I became the leader. Send me the CONTROL_SERVICE requests.")
})

if *podName != "" {
le.WithIdentity(*podName)
}

if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace)
}

le.WithLeaseDuration(*leaderElectionLeaseDuration)
le.WithRenewDeadline(*leaderElectionRenewDeadline)
le.WithRetryPeriod(*leaderElectionRetryPeriod)

// le.Run() is not expected to return on success
err = le.Run()
if err != nil {
klog.Fatalf("Failed to run as a leader: %v", err)
}

klog.Fatal("Mutiny! It seems someone else became captain, but left me alive...")
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 661910e

Please sign in to comment.