From b0c9bcd2460f318843b5972f0f494676c22a08cf Mon Sep 17 00:00:00 2001 From: Bipul Adhikari Date: Mon, 28 Oct 2024 17:34:40 +0545 Subject: [PATCH] Add support for TLS Signed-off-by: Bipul Adhikari --- cmd/manager/main.go | 9 +- config/manager/manager.yaml | 8 + deploy/controller/csi-addons-config.yaml | 16 ++ deploy/controller/setup-controller.yaml | 8 + internal/connection/connection.go | 21 ++- .../csiaddons/csiaddonsnode_controller.go | 7 +- internal/kubernetes/namespace.go | 21 +++ internal/kubernetes/token/grpc.go | 151 ++++++++++++++++++ sidecar/internal/server/server.go | 30 +++- sidecar/main.go | 17 +- tools/go.mod | 5 +- 11 files changed, 272 insertions(+), 21 deletions(-) create mode 100644 internal/kubernetes/namespace.go create mode 100644 internal/kubernetes/token/grpc.go diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 3183d4bb1..20b109d41 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -77,6 +77,7 @@ func main() { enableAdmissionWebhooks bool ctx = context.Background() cfg = util.NewConfig() + enableTLS bool ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -91,6 +92,7 @@ func main() { flag.StringVar(&cfg.Namespace, "namespace", cfg.Namespace, "Namespace where the CSIAddons pod is deployed") flag.BoolVar(&enableAdmissionWebhooks, "enable-admission-webhooks", false, "[DEPRECATED] Enable the admission webhooks") flag.BoolVar(&showVersion, "version", false, "Print Version details") + flag.BoolVar(&enableTLS, "tls", true, "Enable TLS(enabled by default)") opts := zap.Options{ Development: true, TimeEncoder: zapcore.ISO8601TimeEncoder, @@ -146,9 +148,10 @@ func main() { MaxConcurrentReconciles: cfg.MaxConcurrentReconciles, } if err = (&controllers.CSIAddonsNodeReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConnPool: connPool, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ConnPool: connPool, + EnableTLS: enableTLS, }).SetupWithManager(mgr, ctrlOptions); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CSIAddonsNode") os.Exit(1) diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index fd1c45f20..9c448f15f 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -40,6 +40,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + volumeMounts: + - name: ca-cert + mountPath: /etc/tls/ca + readOnly: true securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true @@ -64,5 +68,9 @@ spec: requests: cpu: 10m memory: 64Mi + volumes: + - name: tls-secret + secret: + secretName: controller-manager-tls serviceAccountName: csi-addons-controller-manager terminationGracePeriodSeconds: 10 diff --git a/deploy/controller/csi-addons-config.yaml b/deploy/controller/csi-addons-config.yaml index 85246e346..a466513b2 100644 --- a/deploy/controller/csi-addons-config.yaml +++ b/deploy/controller/csi-addons-config.yaml @@ -8,3 +8,19 @@ metadata: data: "reclaim-space-timeout": "3m" "max-concurrent-reconciles": "100" +--- +apiVersion: v1 +kind: Service +metadata: + name: csi-addons-sidecar + # replace the namespace with the namespace where the operator is deployed. + namespace: csi-addons-system + annotations: + service.alpha.openshift.io/serving-cert-secret-name: csi-addons-sidecar-tls +spec: + ports: + - name: grpc + port: 8443 + targetPort: 8443 + selector: + app: csi-addons-sidecar diff --git a/deploy/controller/setup-controller.yaml b/deploy/controller/setup-controller.yaml index d602b6024..78894a551 100644 --- a/deploy/controller/setup-controller.yaml +++ b/deploy/controller/setup-controller.yaml @@ -104,7 +104,15 @@ spec: securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true + volumeMounts: + - mountPath: /etc/tls/ca + name: ca-cert + readOnly: true securityContext: runAsNonRoot: true serviceAccountName: csi-addons-controller-manager terminationGracePeriodSeconds: 10 + volumes: + - name: tls-secret + secret: + secretName: controller-manager-tls diff --git a/internal/connection/connection.go b/internal/connection/connection.go index 0b24f0210..24c8a0e84 100644 --- a/internal/connection/connection.go +++ b/internal/connection/connection.go @@ -18,11 +18,15 @@ package connection import ( "context" + "crypto/x509" + "fmt" "time" + "github.com/csi-addons/kubernetes-csi-addons/internal/kubernetes/token" "github.com/csi-addons/spec/lib/go/identity" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/credentials" + "sigs.k8s.io/controller-runtime/pkg/client" ) // Connection struct consists of to NodeID, DriverName, Capabilities for the controller @@ -39,11 +43,22 @@ type Connection struct { // NewConnection establishes connection with sidecar, fetches capability and returns Connection object // filled with required information. -func NewConnection(ctx context.Context, endpoint, nodeID, driverName, namespace, podName string) (*Connection, error) { +func NewConnection(ctx context.Context, endpoint, nodeID, driverName, namespace, podName string, client client.Client, enableTLS bool) (*Connection, error) { opts := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithIdleTimeout(time.Duration(0)), } + opts = append(opts, token.WithServiceAccountToken(client, namespace, "csi-addons-sa")) + if enableTLS { + + caFile, caError := token.GetServerCert() + if caError != nil { + panic(fmt.Errorf("failed to get server cert %v", caError)) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM([]byte(caFile)) + creds := credentials.NewClientTLSFromCert(caCertPool, "") + opts = append(opts, grpc.WithTransportCredentials(creds)) + } cc, err := grpc.NewClient(endpoint, opts...) if err != nil { return nil, err diff --git a/internal/controller/csiaddons/csiaddonsnode_controller.go b/internal/controller/csiaddons/csiaddonsnode_controller.go index d5f0f763e..ce1f956a3 100644 --- a/internal/controller/csiaddons/csiaddonsnode_controller.go +++ b/internal/controller/csiaddons/csiaddonsnode_controller.go @@ -49,8 +49,9 @@ var ( // CSIAddonsNodeReconciler reconciles a CSIAddonsNode object type CSIAddonsNodeReconciler struct { client.Client - Scheme *runtime.Scheme - ConnPool *connection.ConnectionPool + Scheme *runtime.Scheme + ConnPool *connection.ConnectionPool + EnableTLS bool } //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch @@ -120,7 +121,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques } logger.Info("Connecting to sidecar") - newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Namespace, csiAddonsNode.Name) + newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Namespace, csiAddonsNode.Name, r.Client, r.EnableTLS) if err != nil { logger.Error(err, "Failed to establish connection with sidecar") diff --git a/internal/kubernetes/namespace.go b/internal/kubernetes/namespace.go new file mode 100644 index 000000000..1c6e6514b --- /dev/null +++ b/internal/kubernetes/namespace.go @@ -0,0 +1,21 @@ +package kubernetes + +import ( + "io" + "os" +) + +func GetNamespace() (string, error) { + namespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace" + file, err := os.Open(namespaceFile) + if err != nil { + return "", err + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return "", err + } + return string(data), nil +} diff --git a/internal/kubernetes/token/grpc.go b/internal/kubernetes/token/grpc.go new file mode 100644 index 000000000..907fa19b0 --- /dev/null +++ b/internal/kubernetes/token/grpc.go @@ -0,0 +1,151 @@ +/* +Copyright 2024 The Kubernetes-CSI-Addons Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package token + +import ( + "context" + "fmt" + "io" + "os" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + authv1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type tokenResolver struct { + kubeclient client.Client + namespace string + serviceAccount string + + token string + expiration metav1.Time +} + +func WithServiceAccountToken(client client.Client, namespace, serviceAccount string) grpc.DialOption { + tr := tokenResolver{ + kubeclient: client, + namespace: namespace, + serviceAccount: serviceAccount, + expiration: metav1.Now(), + } + + return grpc.WithUnaryInterceptor(tr.addAuthorizationHeader) +} + +func (tr *tokenResolver) addAuthorizationHeader(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + token, err := tr.getToken(ctx) + if err != nil { + return err + } + + authCtx := metadata.AppendToOutgoingContext(ctx, "Authorization", "Bearer "+token) + return invoker(authCtx, method, req, reply, cc, opts...) +} + +func (tr *tokenResolver) getToken(ctx context.Context) (string, error) { + now := metav1.Now() + if tr.expiration.Before(&now) { + // token expired + return tr.refreshToken(ctx) + } + + return tr.token, nil +} + +func (tr *tokenResolver) refreshToken(ctx context.Context) (string, error) { + treq := &authv1.TokenRequest{ + Spec: authv1.TokenRequestSpec{ + Audiences: []string{"csi-addons"}, + }, + } + + err := tr.kubeclient.Create(ctx, treq) + if err != nil { + return "", err + } + + tr.token = treq.Status.Token + tr.expiration = treq.Status.ExpirationTimestamp + + return tr.token, nil +} + +func AuthorizationInterceptor(kubeclient kubernetes.Clientset) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if err := authorizeConnection(kubeclient, ctx); err != nil { + return nil, err + } + return handler(ctx, req) + } +} + +func authorizeConnection(kubeclient kubernetes.Clientset, ctx context.Context) error { + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return status.Errorf(codes.Unauthenticated, "missing metadata") + } + + authHeader, ok := md["authorization"] + if !ok || len(authHeader) == 0 { + return status.Errorf(codes.Unauthenticated, "missing authorization token") + } + + token := authHeader[0] + isValidated, err := validateBearerToken(token, kubeclient) + if !isValidated || err != nil { + return status.Errorf(codes.Unauthenticated, "invalid token") + } + return nil +} + +func validateBearerToken(token string, kubeclient kubernetes.Clientset) (bool, error) { + tokenReview := &authv1.TokenReview{ + Spec: authv1.TokenReviewSpec{ + Token: token, + }, + } + result, err := kubeclient.AuthenticationV1().TokenReviews().Create(context.TODO(), tokenReview, metav1.CreateOptions{}) + if err != nil { + return false, fmt.Errorf("failed to review token %v", err) + } + + if result.Status.Authenticated { + return true, nil + } + return false, nil +} + +func GetServerCert() (string, error) { + certFile := "/etc/tls/ca.crt" + file, err := os.Open(certFile) + if err != nil { + return "", err + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return "", err + } + return string(data), nil +} diff --git a/sidecar/internal/server/server.go b/sidecar/internal/server/server.go index 112554683..f859b3b45 100644 --- a/sidecar/internal/server/server.go +++ b/sidecar/internal/server/server.go @@ -18,9 +18,13 @@ package server import ( "errors" + "fmt" "net" + "github.com/csi-addons/kubernetes-csi-addons/internal/kubernetes/token" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + k8s "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -38,15 +42,17 @@ type SidecarServer struct { // URL components to listen on the tcp port scheme string endpoint string + client k8s.Clientset - server *grpc.Server - services []SidecarService + server *grpc.Server + services []SidecarService + enableTLS bool } // NewSidecarServer create a new SidecarServer on the given IP-address and // port. If the IP-address is an empty string, the server will listen on all // available IP-addresses. Only tcp ports are supported. -func NewSidecarServer(ip, port string) *SidecarServer { +func NewSidecarServer(ip string, port int, client k8s.Clientset, enableTLS bool) *SidecarServer { ss := &SidecarServer{} if ss.services == nil { @@ -54,8 +60,9 @@ func NewSidecarServer(ip, port string) *SidecarServer { } ss.scheme = "tcp" - ss.endpoint = ip + ":" + port - + ss.endpoint = ip + ":" + fmt.Sprint(port) + ss.client = client + ss.enableTLS = enableTLS return ss } @@ -69,8 +76,17 @@ func (ss *SidecarServer) RegisterService(svc SidecarService) { // Init creates the internal gRPC server, and registers the SidecarServices. // and starts gRPC server. func (ss *SidecarServer) Start() { - // create the gRPC server and register services - ss.server = grpc.NewServer() + if ss.enableTLS { + creds, err := credentials.NewServerTLSFromFile("/etc/tls/tls.crt", "/etc/tls/tls.key") + if err != nil { + klog.Fatalf("Could not find TLS file: %v", err) + } + // create the gRPC server and register services + ss.server = grpc.NewServer(grpc.UnaryInterceptor(token.AuthorizationInterceptor(ss.client)), grpc.Creds(creds)) + } + if !ss.enableTLS { + ss.server = grpc.NewServer(grpc.UnaryInterceptor(token.AuthorizationInterceptor(ss.client))) + } for _, svc := range ss.services { svc.RegisterService(ss.server) diff --git a/sidecar/main.go b/sidecar/main.go index e2e6d410d..6f7927add 100644 --- a/sidecar/main.go +++ b/sidecar/main.go @@ -19,6 +19,7 @@ package main import ( "context" "flag" + "fmt" "time" "github.com/csi-addons/kubernetes-csi-addons/internal/sidecar/service" @@ -43,19 +44,21 @@ func main() { csiAddonsAddress = flag.String("csi-addons-address", "/run/csi-addons/socket", "CSI Addons endopoint") nodeID = flag.String("node-id", "", "NodeID") stagingPath = flag.String("stagingpath", defaultStagingPath, "stagingpath") - controllerPort = flag.String("controller-port", "", + controllerPort = flag.Int("controller-port", 0, "The TCP network port where the gRPC server for controller request, will listen (example: `8080`)") controllerIP = flag.String("controller-ip", "", "The TCP network ip address where the gRPC server for controller request, will listen (example: `192.168.61.228`)") podName = flag.String("pod", "", "name of the Pod that contains this sidecar") podNamespace = flag.String("namespace", "", "namespace of the Pod that contains this sidecar") podUID = flag.String("pod-uid", "", "UID of the Pod that contains this sidecar") + proxyPort = flag.Int("proxy-port", 0, "The TCP port that is available for outside connections to the gRPC server (example: `9070`)") showVersion = flag.Bool("version", false, "Print Version details") leaderElectionNamespace = flag.String("leader-election-namespace", "", "The namespace where the leader election resource exists. Defaults to the pod namespace if not set.") leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") + enableTLS = flag.Bool("tls", true, "Enable TLS(enabled by default)") ) klog.InitFlags(nil) @@ -70,7 +73,15 @@ func main() { return } - controllerEndpoint, err := sideutil.BuildEndpointURL(*controllerIP, *controllerPort, *podName, *podNamespace) + // if proxyPort is set, use that in the CSIAddonsNode.Endpoint URL + publicPort := *proxyPort + publicIP := "" + if publicPort == 0 { + publicPort = *controllerPort + publicIP = *controllerIP + } + + controllerEndpoint, err := sideutil.BuildEndpointURL(publicIP, fmt.Sprint(publicPort), *podName, *podNamespace) if err != nil { klog.Fatalf("Failed to validate controller endpoint: %v", err) } @@ -109,7 +120,7 @@ func main() { klog.Fatalf("Failed to create csiaddonsnode: %v", err) } - sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort) + sidecarServer := server.NewSidecarServer(*controllerIP, *controllerPort, *kubeClient, *enableTLS) sidecarServer.RegisterService(service.NewIdentityServer(csiClient.GetGRPCClient())) sidecarServer.RegisterService(service.NewReclaimSpaceServer(csiClient.GetGRPCClient(), kubeClient, *stagingPath)) sidecarServer.RegisterService(service.NewNetworkFenceServer(csiClient.GetGRPCClient(), kubeClient)) diff --git a/tools/go.mod b/tools/go.mod index d0c61f359..84140f118 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -1,7 +1,8 @@ module github.com/csi-addons/kubernetes-csi-addons/tools -go 1.22.0 -toolchain go1.22.8 +go 1.22.7 + +toolchain go1.23.0 require ( github.com/operator-framework/operator-sdk v1.37.0