Skip to content

Commit

Permalink
Make registration path mandatory, remove kubeconfig parameter
Browse files Browse the repository at this point in the history
Signed-off-by: Jose A. Rivera <[email protected]>
  • Loading branch information
jarrpa authored and msau42 committed Dec 27, 2018
1 parent 2770302 commit 2f949d5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 345 deletions.
30 changes: 6 additions & 24 deletions cmd/node-driver-registrar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"

"github.com/kubernetes-csi/node-driver-registrar/pkg/connection"
Expand All @@ -45,7 +43,6 @@ const (

// Command line flags
var (
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
kubeletRegistrationPath = flag.String("kubelet-registration-path", "",
Expand Down Expand Up @@ -104,6 +101,11 @@ func main() {
flag.Set("logtostderr", "true")
flag.Parse()

if *kubeletRegistrationPath == "" {
glog.Error("kubelet-registration-path is a required parameter")
os.Exit(1)
}

if *showVersion {
fmt.Println(os.Args[0], version)
return
Expand Down Expand Up @@ -133,26 +135,6 @@ func main() {
}
glog.V(2).Infof("CSI driver name: %q", csiDriverName)

// Create the client config. Use kubeconfig if given, otherwise assume
// in-cluster.
glog.V(1).Infof("Loading kubeconfig.")
config, err := buildConfig(*kubeconfig)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}

// Run forever
nodeRegister(config, csiConn, csiDriverName)
}

func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}

// Return config object which uses the service account kubernetes gives to
// pods. It's intended for clients that are running inside a pod running on
// kubernetes.
return rest.InClusterConfig()
nodeRegister(csiConn, csiDriverName)
}
299 changes: 29 additions & 270 deletions cmd/node-driver-registrar/node_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,20 @@ package main

import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"os/signal"
"time"

"google.golang.org/grpc"

"github.com/golang/glog"
"golang.org/x/sys/unix"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"

"github.com/kubernetes-csi/node-driver-registrar/pkg/connection"
)

func nodeRegister(
config *rest.Config,
csiConn connection.CSIConnection,
csiDriverName string,
) {
Expand All @@ -65,272 +56,40 @@ func nodeRegister(
// When kubeletRegistrationPath is specified then driver-registrar ONLY acts
// as gRPC server which replies to registration requests initiated by kubelet's
// pluginswatcher infrastructure. Node labeling is done by kubelet's csi code.
if *kubeletRegistrationPath != "" {
registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions)
socketPath := fmt.Sprintf("/registration/%s-reg.sock", csiDriverName)
fi, err := os.Stat(socketPath)
if err == nil && (fi.Mode()&os.ModeSocket) != 0 {
// Remove any socket, stale or not, but fall through for other files
if err := os.Remove(socketPath); err != nil {
glog.Errorf("failed to remove stale socket %s with error: %+v", socketPath, err)
os.Exit(1)
}
}
if err != nil && !os.IsNotExist(err) {
glog.Errorf("failed to stat the socket %s with error: %+v", socketPath, err)
os.Exit(1)
}
// Default to only user accessible socket, caller can open up later if desired
oldmask := unix.Umask(0077)

glog.Infof("Starting Registration Server at: %s\n", socketPath)
lis, err := net.Listen("unix", socketPath)
if err != nil {
glog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
os.Exit(1)
}
unix.Umask(oldmask)
glog.Infof("Registration Server started at: %s\n", socketPath)
grpcServer := grpc.NewServer()
// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrar)

// Starts service
if err := grpcServer.Serve(lis); err != nil {
glog.Errorf("Registration Server stopped serving: %v", err)
registrar := newRegistrationServer(csiDriverName, *kubeletRegistrationPath, supportedVersions)
socketPath := fmt.Sprintf("/registration/%s-reg.sock", csiDriverName)
fi, err := os.Stat(socketPath)
if err == nil && (fi.Mode()&os.ModeSocket) != 0 {
// Remove any socket, stale or not, but fall through for other files
if err := os.Remove(socketPath); err != nil {
glog.Errorf("failed to remove stale socket %s with error: %+v", socketPath, err)
os.Exit(1)
}
// If gRPC server is gracefully shutdown, exit
os.Exit(0)
} else { // only apply Node label update when kubelet plugin not used
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}

glog.V(1).Infof("Attempt to update node annotation if needed")
k8sNodesClient := clientset.CoreV1().Nodes()

// Set up goroutine to cleanup (aka deregister) on termination.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
err := getVerifyAndDeleteNodeId(
k8sNodeName,
k8sNodesClient,
csiDriverName)
if err != nil {
glog.Warning(err)
}
os.Exit(1)
}()

// This program is intended to run as a side-car container inside a
// Kubernetes DaemonSet. Kubernetes DaemonSet only have one RestartPolicy,
// always, meaning as soon as this container terminates, it will be started
// again. Therefore, this program will loop indefientley and periodically
// update the node annotation.
// The CSI driver name and node ID are assumed to be immutable, and are not
// refetched on subsequent loop iterations.
for {
err := getVerifyAndAddNodeId(
k8sNodeName,
k8sNodesClient,
csiDriverName,
csiDriverNodeId)
if err != nil {
glog.Warning(err)
}
time.Sleep(sleepDuration)
}
}
}

// Fetches Kubernetes node API object corresponding to k8sNodeName.
// If the csiDriverName and csiDriverNodeId are not present in the node
// annotation, this method adds it.
func getVerifyAndAddNodeId(
k8sNodeName string,
k8sNodesClient corev1.NodeInterface,
csiDriverName string,
csiDriverNodeId string,
) error {
// Add or update annotation on Node object
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("Failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}

var previousAnnotationValue string
if result.ObjectMeta.Annotations != nil {
previousAnnotationValue =
result.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}

existingDriverMap := map[string]string{}
if previousAnnotationValue != "" {
// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return fmt.Errorf(
"Failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}
}

if val, ok := existingDriverMap[csiDriverName]; ok {
if val == csiDriverNodeId {
// Value already exists in node annotation, nothing more to do
glog.V(1).Infof(
"The key value {%q: %q} alredy eixst in node %q annotation, no need to update: %v",
csiDriverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
return nil
}
}

// Add/update annotation value
existingDriverMap[csiDriverName] = csiDriverNodeId
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return fmt.Errorf(
"Failed while trying to add key value {%q: %q} to node %q annotation. Existing value: %v",
csiDriverName,
csiDriverNodeId,
annotationKey,
previousAnnotationValue)
}

result.ObjectMeta.Annotations = cloneAndAddAnnotation(
result.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
_, updateErr := k8sNodesClient.Update(result)
if updateErr == nil {
fmt.Printf(
"Updated node %q successfully for CSI driver %q and CSI node name %q",
k8sNodeName,
csiDriverName,
csiDriverNodeId)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("Node update failed: %v", retryErr)
}
return nil
}

// Fetches Kubernetes node API object corresponding to k8sNodeName.
// If the csiDriverName is present in the node annotation, it is removed.
func getVerifyAndDeleteNodeId(
k8sNodeName string,
k8sNodesClient corev1.NodeInterface,
csiDriverName string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Node before attempting update, so that
// existing changes are not overwritten. RetryOnConflict uses
// exponential backoff to avoid exhausting the apiserver.
result, getErr := k8sNodesClient.Get(k8sNodeName, metav1.GetOptions{})
if getErr != nil {
glog.Errorf("Failed to get latest version of Node: %v", getErr)
return getErr // do not wrap error
}

var previousAnnotationValue string
if result.ObjectMeta.Annotations != nil {
previousAnnotationValue =
result.ObjectMeta.Annotations[annotationKey]
glog.V(3).Infof(
"previousAnnotationValue=%q", previousAnnotationValue)
}

existingDriverMap := map[string]string{}
if previousAnnotationValue == "" {
// Value already exists in node annotation, nothing more to do
glog.V(1).Infof(
"The key %q does not exist in node %q annotation, no need to cleanup.",
csiDriverName,
annotationKey)
return nil
}

// Parse previousAnnotationValue as JSON
if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
return fmt.Errorf(
"Failed to parse node's %q annotation value (%q) err=%v",
annotationKey,
previousAnnotationValue,
err)
}

if _, ok := existingDriverMap[csiDriverName]; !ok {
// Value already exists in node annotation, nothing more to do
glog.V(1).Infof(
"The key %q does not eixst in node %q annotation, no need to cleanup: %v",
csiDriverName,
annotationKey,
previousAnnotationValue)
return nil
}

// Add/update annotation value
delete(existingDriverMap, csiDriverName)
jsonObj, err := json.Marshal(existingDriverMap)
if err != nil {
return fmt.Errorf(
"Failed while trying to remove key %q from node %q annotation. Existing data: %v",
csiDriverName,
annotationKey,
previousAnnotationValue)
}

result.ObjectMeta.Annotations = cloneAndAddAnnotation(
result.ObjectMeta.Annotations,
annotationKey,
string(jsonObj))
_, updateErr := k8sNodesClient.Update(result)
if updateErr == nil {
fmt.Printf(
"Updated node %q annotation to remove CSI driver %q.",
k8sNodeName,
csiDriverName)
}
return updateErr // do not wrap error
})
if retryErr != nil {
return fmt.Errorf("Node update failed: %v", retryErr)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("failed to stat the socket %s with error: %+v", socketPath, err)
os.Exit(1)
}
return nil
}
// Default to only user accessible socket, caller can open up later if desired
oldmask := unix.Umask(0077)

// Clones the given map and returns a new map with the given key and value added.
// Returns the given map, if annotationKey is empty.
func cloneAndAddAnnotation(
annotations map[string]string,
annotationKey,
annotationValue string) map[string]string {
if annotationKey == "" {
// Don't need to add an annotation.
return annotations
glog.Infof("Starting Registration Server at: %s\n", socketPath)
lis, err := net.Listen("unix", socketPath)
if err != nil {
glog.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
os.Exit(1)
}
// Clone.
newAnnotations := map[string]string{}
for key, value := range annotations {
newAnnotations[key] = value
unix.Umask(oldmask)
glog.Infof("Registration Server started at: %s\n", socketPath)
grpcServer := grpc.NewServer()
// Registers kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrar)

// Starts service
if err := grpcServer.Serve(lis); err != nil {
glog.Errorf("Registration Server stopped serving: %v", err)
os.Exit(1)
}
newAnnotations[annotationKey] = annotationValue
return newAnnotations
// If gRPC server is gracefully shutdown, exit
os.Exit(0)
}
Loading

0 comments on commit 2f949d5

Please sign in to comment.