From d97dbc2b5d069e92b40fb8fa78c494c61c7203a7 Mon Sep 17 00:00:00 2001 From: Amarnath Valluri Date: Thu, 10 Sep 2020 06:26:18 +0300 Subject: [PATCH] operator: revise deployment status API and operator reconcile loop Revised the Deployment.Status to accommodate the deployment state conditions and driver state. Currently, Deployment has 3 conditions named CertsVerified, CertsReady, and DriverDeployed. It also records the summary of controller and node driver state, .i.e, no. of nodes the driver is running. In order to record real time status of the driver current had to rewrite the current reconcile loop. The existing reconcile loop was keen on the deployment CR changes and redeploy *only* the sub-objects that requires to redeploy. Instead the new reconcile logic *refresh* all the objects and CR status to keep the state consistent. The refresh chooses to merge patching the objects to avoid all unnecessary updates. There are two reconcile entry points: - CR reconcile loop: refreshes all the sub-objects and CR status - sub-object vent handler: redeploy only the deleted/changed resource and updates CR status if required. This also includes other code cleanups that come across. TODOs: - E2E tests for validating if the operator restores the state of a broken deployment - i.e, recovering deleted/modified sub-objects. FIXES: #611 --- pkg/apis/pmemcsi/v1alpha1/deployment_types.go | 117 ++ .../pmemcsi/v1alpha1/zz_generated.deepcopy.go | 46 + .../deployment/controller_driver.go | 1395 ++++++++++------- .../deployment/deployment_controller.go | 256 ++- 4 files changed, 1182 insertions(+), 632 deletions(-) diff --git a/pkg/apis/pmemcsi/v1alpha1/deployment_types.go b/pkg/apis/pmemcsi/v1alpha1/deployment_types.go index e1f8bb0f3c..04e8ff81fc 100644 --- a/pkg/apis/pmemcsi/v1alpha1/deployment_types.go +++ b/pkg/apis/pmemcsi/v1alpha1/deployment_types.go @@ -55,6 +55,7 @@ const ( // Related issue : https://github.com/kubernetes-sigs/controller-tools/issues/478 // Fails setting min/max for integers: https://github.com/helm/helm/issues/5806 +// +k8s:deepcopy-gen=true // DeploymentSpec defines the desired state of Deployment type DeploymentSpec struct { // Important: Run "make operator-generate-k8s" to regenerate code after modifying this file @@ -109,6 +110,66 @@ type DeploymentSpec struct { KubeletDir string `json:"kubeletDir,omitempty"` } +// DeploymentConditionType type for representing a deployment status condition +type DeploymentConditionType string + +const ( + // CertsVerified means the provided deployment secrets are verified and valid for usage + CertsVerified DeploymentConditionType = "CertsVerified" + // CertsReady means secrests/certificates required for running the PMEM-CSI driver + // are ready and the deployment could progress further + CertsReady DeploymentConditionType = "CertsReady" + // DriverDeployed means that the all the sub-resources required for the deployment CR + // got created + DriverDeployed DeploymentConditionType = "DriverDeployed" +) + +// +k8s:deepcopy-gen=true +type DeploymentCondition struct { + // Type of condition. + Type DeploymentConditionType `json:"type"` + // Status of the condition, one of True, False, Unknown. + Status corev1.ConditionStatus `json:"status"` + // Message human readable text that explain why this condition is in this state + // +optional + Reason string `json:"reason,omitempty"` + // Last time the condition was probed. + // +optional + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` +} + +type DriverType int + +const ( + ControllerDriver DriverType = iota + NodeDriver +) + +func (t DriverType) String() string { + switch t { + case ControllerDriver: + return "controller" + case NodeDriver: + return "node" + } + return "" +} + +// +k8s:deepcopy-gen=true +type DriverStatus struct { + // Type represents type of the driver: controller or node + Type string `json:"type"` + // Status represents the driver status : Ready, NotReady + Status string `json:"status"` + // Reason represents the human readable text that explains why the + // driver is in this state. + Reason string `json:"reason"` + // LastUpdated time of the driver status + LastUpdated metav1.Time `json:"lastUpdated,omitempty"` +} + +// +k8s:deepcopy-gen=true + // DeploymentStatus defines the observed state of Deployment type DeploymentStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster @@ -116,6 +177,9 @@ type DeploymentStatus struct { // Phase indicates the state of the deployment Phase DeploymentPhase `json:"phase,omitempty"` + // Conditions + Conditions []DeploymentCondition `json:"conditions,omitempty"` + Components []DriverStatus `json:"driverComponents,omitempty"` // LastUpdated time of the deployment status LastUpdated metav1.Time `json:"lastUpdated,omitempty"` } @@ -256,6 +320,35 @@ func (c DeploymentChange) String() string { }[c] } +func (d *Deployment) SetCondition(t DeploymentConditionType, state corev1.ConditionStatus, reason string) { + for _, c := range d.Status.Conditions { + if c.Type == t { + c.Status = state + c.Reason = reason + c.LastUpdateTime = metav1.Now() + return + } + } + d.Status.Conditions = append(d.Status.Conditions, DeploymentCondition{ + Type: t, + Status: state, + Reason: reason, + LastUpdateTime: metav1.Now(), + }) +} + +func (d *Deployment) SetDriverStatus(t DriverType, status, reason string) { + if d.Status.Components == nil { + d.Status.Components = make([]DriverStatus, 2) + } + d.Status.Components[t] = DriverStatus{ + Type: t.String(), + Status: status, + Reason: reason, + LastUpdated: metav1.Now(), + } +} + // EnsureDefaults make sure that the deployment object has all defaults set properly func (d *Deployment) EnsureDefaults(operatorImage string) error { if d.Spec.Image == "" { @@ -420,6 +513,30 @@ func (d *Deployment) GetOwnerReference() metav1.OwnerReference { } } +// HaveCertificatesConfigured checks if the configured deployment +// certificate fields are valid. Returns true if valid else appropriate +// error. +func (d *Deployment) HaveCertificatesConfigured() (bool, error) { + // Encoded private keys and certificates + caCert := d.Spec.CACert + registryPrKey := d.Spec.RegistryPrivateKey + ncPrKey := d.Spec.NodeControllerPrivateKey + registryCert := d.Spec.RegistryCert + ncCert := d.Spec.NodeControllerCert + + // sanity check + if caCert == nil { + if registryCert != nil || ncCert != nil { + return false, fmt.Errorf("incomplete deployment configuration: missing root CA certificate by which the provided certificates are signed") + } + return false, nil + } else if registryCert == nil || registryPrKey == nil || ncCert == nil || ncPrKey == nil { + return false, fmt.Errorf("incomplete deployment configuration: certificates and corresponding private keys must be provided") + } + + return true, nil +} + func GetDeploymentCRDSchema() *apiextensions.JSONSchemaProps { One := float64(1) Hundred := float64(100) diff --git a/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go index da070eacd8..b74a372963 100644 --- a/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go @@ -36,6 +36,22 @@ func (in *Deployment) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeploymentCondition) DeepCopyInto(out *DeploymentCondition) { + *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentCondition. +func (in *DeploymentCondition) DeepCopy() *DeploymentCondition { + if in == nil { + return nil + } + out := new(DeploymentCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeploymentList) DeepCopyInto(out *DeploymentList) { *out = *in @@ -135,6 +151,20 @@ func (in *DeploymentSpec) DeepCopy() *DeploymentSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]DeploymentCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.Components != nil { + in, out := &in.Components, &out.Components + *out = make([]DriverStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.LastUpdated.DeepCopyInto(&out.LastUpdated) } @@ -147,3 +177,19 @@ func (in *DeploymentStatus) DeepCopy() *DeploymentStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DriverStatus) DeepCopyInto(out *DriverStatus) { + *out = *in + in.LastUpdated.DeepCopyInto(&out.LastUpdated) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DriverStatus. +func (in *DriverStatus) DeepCopy() *DriverStatus { + if in == nil { + return nil + } + out := new(DriverStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/pmem-csi-operator/controller/deployment/controller_driver.go b/pkg/pmem-csi-operator/controller/deployment/controller_driver.go index 89b0d4b7ab..58001a5bfc 100644 --- a/pkg/pmem-csi-operator/controller/deployment/controller_driver.go +++ b/pkg/pmem-csi-operator/controller/deployment/controller_driver.go @@ -67,194 +67,509 @@ var AllObjectTypes = []schema.GroupVersionKind{ type PmemCSIDriver struct { *api.Deployment // operators namespace used for creating sub-resources - namespace string - + namespace string k8sVersion version.Version } -// Reconcile reconciles the driver deployment -func (d *PmemCSIDriver) Reconcile(r *ReconcileDeployment) (bool, error) { - changes := map[api.DeploymentChange]struct{}{} +type ObjectPatch struct { + obj apiruntime.Object + patch client.Patch +} - oldDeployment, foundInCache := r.deployments[d.Name] - if !foundInCache { - /* New deployment */ - r.evRecorder.Event(d, corev1.EventTypeNormal, api.EventReasonNew, "Processing new driver deployment") +func NewObjectPatch(obj, copy apiruntime.Object) *ObjectPatch { + return &ObjectPatch{ + obj: obj, + patch: client.MergeFrom(copy), } +} - copy := d.DeepCopy() +// IsNew checks if the object is a new object, i.e, the object is not +// yet stored with APIServer +func (op ObjectPatch) IsNew() bool { + if op.obj != nil { + // We ignore only possible error - client.errNotObject + // and treat it's as new object + if o, err := meta.Accessor(op.obj); err == nil { + // An object registered with API serve will have + // a non-empty(zero) resource version + return o.GetResourceVersion() == "" + } + } + return true +} + +// Reconcile reconciles the driver deployment +func (d *PmemCSIDriver) Reconcile(r *ReconcileDeployment) error { if err := d.EnsureDefaults(r.containerImage); err != nil { d.Deployment.Status.Phase = api.DeploymentPhaseFailed r.evRecorder.Event(d, corev1.EventTypeWarning, api.EventReasonFailed, err.Error()) - return true, err - } - - if foundInCache { - // Cached deployment does not have defaults stored in it. - // So, reset those defaults before comparing otherwise - // those set defaults `d.Deployment` will be detected as - // changes. - cached := oldDeployment.DeepCopy() - if err := cached.EnsureDefaults(r.containerImage); err != nil { - r.evRecorder.Event(d, corev1.EventTypeWarning, api.EventReasonFailed, err.Error()) - return true, err + return err + } + + klog.Infof("Deployment: %q, state %q ", d.Name, d.Status.Phase) + var allObjects []apiruntime.Object + redeployAll := func() error { + var o apiruntime.Object + var err error + if s, err := d.redeploySecrets(r); err != nil { + return err + } else { + for _, o := range s { + allObjects = append(allObjects, o) + } } - changes = d.Compare(cached) - } - - klog.Infof("Deployment: %q, state %q, changes %v, in cache %v", d.Name, d.Status.Phase, changes, foundInCache) - - requeue, err := d.reconcileDeploymentChanges(r, changes, foundInCache) - // Some clients(fake client used in tests) do not support patching status reliably - // and updates even spec changes. So, revert any spec changes(like deployment defaults) we made. - // Those are not supposed to get saved on the API server. - d.Deployment.Spec = copy.Spec - if err == nil { - // If everything ok, update local cache - r.deployments[d.Name] = d.Deployment - - // TODO: wait for functional driver before entering "running" phase. - // For now we go straight to it. - d.Status.Phase = api.DeploymentPhaseRunning - r.evRecorder.Event(d, corev1.EventTypeNormal, api.EventReasonRunning, "Driver deployment successful") - } else if d.Status.Phase == api.DeploymentPhaseNew { - d.Status.Phase = api.DeploymentPhaseFailed - r.evRecorder.Event(d, corev1.EventTypeWarning, api.EventReasonFailed, err.Error()) + if o, err = d.redeployProvisionerRole(r); err != nil { + return fmt.Errorf("failed to update RBAC role: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployProvisionerRoleBinding(r); err != nil { + return fmt.Errorf("failed to update RBAC role bindings: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to update RBAC cluster role: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployProvisionerClusterRoleBinding(r); err != nil { + return fmt.Errorf("failed to update RBAC cluster role bindings: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployServiceAccount(r); err != nil { + return fmt.Errorf("failed to update service account: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployControllerService(r); err != nil { + return fmt.Errorf("failed to update controller service: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployMetricsService(r); err != nil { + return fmt.Errorf("failed to update controller service: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployCSIDriver(r); err != nil { + return fmt.Errorf("failed to update CSI driver: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployControllerDriver(r); err != nil { + return fmt.Errorf("failed to update controller driver: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployNodeDriver(r); err != nil { + return fmt.Errorf("failed to update node driver: %v", err) + } + allObjects = append(allObjects, o) + return nil + } + + if err := redeployAll(); err != nil { + d.SetCondition(api.DriverDeployed, corev1.ConditionFalse, err.Error()) + return err + } + + d.SetCondition(api.DriverDeployed, corev1.ConditionTrue, "Driver deployed successfully.") + + klog.Infof("Deployed '%d' objects.", len(allObjects)) + // FIXME(avalluri): Limit the obsolete object deletion either to only version upgrades + // or on operator restart. + if err := d.deleteObsoleteObjects(r, allObjects); err != nil { + return fmt.Errorf("Delete obsolete objects failed with error: %v", err) } - return requeue, err + return nil } -// reconcileDeploymentChanges examines the changes and updates the appropriate objects. -// In case of foundInCache is false, all the objects gets refreshed/updated. -func (d *PmemCSIDriver) reconcileDeploymentChanges(r *ReconcileDeployment, changes map[api.DeploymentChange]struct{}, foundInCache bool) (bool, error) { - updateController := false - updateNodeDriver := false - updateSecrets := false - updateAll := false // Update all objects of the deployment +// getSubObject retrieves the latest revision of given object type from the API server +// And checks if that object is owned by the current deployment CR +func (d *PmemCSIDriver) getSubObject(r *ReconcileDeployment, obj apiruntime.Object) error { + objMeta, err := meta.Accessor(obj) + if err != nil { + return fmt.Errorf("internal error %T: %v", obj, err) + } - if !foundInCache { - // Running deployment not found in cache, possibly result of operator - // restart, refresh all objects. - updateAll = true + klog.V(4).Infof("Getting object %q of type %T", objMeta.GetName(), obj) + if err := r.Get(obj); err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + ownerRef := d.GetOwnerReference() + if !isOwnedBy(objMeta, &ownerRef) { + return fmt.Errorf("'%s' of type %T is not owned by '%s'", objMeta.GetName(), obj, ownerRef.Name) } - for c := range changes { - var err error - switch c { - case api.ControllerResources, api.ProvisionerImage: - updateController = true - case api.NodeResources, api.NodeRegistrarImage: - updateNodeDriver = true - case api.DriverImage, api.LogLevel, api.PullPolicy: - updateController = true - updateNodeDriver = true - case api.DriverMode: - updateNodeDriver = true - case api.NodeSelector: - updateNodeDriver = true - case api.PMEMPercentage: - updateNodeDriver = true - case api.Labels: - updateAll = true - case api.CACertificate, api.RegistryCertificate, api.NodeControllerCertificate: - updateSecrets = true - case api.KubeletDir: - updateNodeDriver = true + return nil +} + +// updateSubObject writes the object changes to the API server. +func (d *PmemCSIDriver) updateSubObject(r *ReconcileDeployment, op *ObjectPatch) error { + objMeta, err := meta.Accessor(op.obj) + if err != nil { + return fmt.Errorf("internal error %T: %v", op.obj, err) + } + + // Set labels just before creating/patching. + // NOTE: Labels cannot be set before creating the client.Patch + // otherwise they get missed from the patch diff. + objMeta.SetLabels(d.Spec.Labels) + if op.IsNew() { + // For unknown reason client.Create() clearing off the + // GVK on obj, So restore it manually. + gvk := op.obj.GetObjectKind().GroupVersionKind() + err := r.client.Create(context.TODO(), op.obj) + op.obj.GetObjectKind().SetGroupVersionKind(gvk) + return err + } + klog.V(4).Infof("Updating object %q of type %T", objMeta.GetName(), op.obj) + return r.client.Patch(context.TODO(), op.obj, op.patch) +} + +// redeploySecrets ensures that the secrets get (re)deployed that are +// required for running the driver. +// +// First it checks if the deployment is configured with the needed certificates. +// If provided, validate and (re)create secrets with using them. +// Else, provision new certificates only if no existing secrets and deploy. +func (d *PmemCSIDriver) redeploySecrets(r *ReconcileDeployment) ([]*corev1.Secret, error) { + rs := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.registrySecretName(), false), + } + if err := d.getSubObject(r, rs); err != nil { + return nil, err + } + rop := NewObjectPatch(rs, rs.DeepCopy()) + + ns := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.nodeSecretName(), false), + } + if err := d.getSubObject(r, ns); err != nil { + return nil, err + } + nop := NewObjectPatch(ns, ns.DeepCopy()) + + update := func() error { + d.getRegistrySecrets(rs) + if err := d.updateSubObject(r, rop); err != nil { + return fmt.Errorf("Failed to update registry secrets: %v", err) } - if err != nil { - klog.Warningf("Incompatible change of deployment occurred: %v", err) + d.getNodeSecrets(ns) + if err := d.updateSubObject(r, nop); err != nil { + return fmt.Errorf("Failed to update node secrets: %v", err) } + return nil } - objects := []apiruntime.Object{} + certsProvided, err := d.HaveCertificatesConfigured() + if err != nil { + d.SetCondition(api.CertsVerified, corev1.ConditionFalse, err.Error()) + return nil, err + } - // Force update all deployment objects - if updateAll { - klog.Infof("Updating all objects for deployment %q", d.Name) - objs, err := d.getDeploymentObjects() - if err != nil { - return true, err - } - objects = append(objects, objs...) - - // If not found cache might be result of operator restart - // check if this deployment has any objects to be deleted - if !foundInCache { - if err := d.deleteObsoleteObjects(r, objs); err != nil { - klog.Infof("Failed to delete obsolete objects: %v", err) - return true, err - } + if certsProvided { + // Use provided certificates + if err := d.validateCertificates(); err != nil { + d.SetCondition(api.CertsVerified, corev1.ConditionFalse, err.Error()) + return nil, err } - } else { - if updateSecrets { - objs, err := d.getSecrets() - if err != nil { - return true, err - } - objects = append(objects, objs...) + d.SetCondition(api.CertsVerified, corev1.ConditionTrue, "Driver certificates validated.") + + // update secrets if required + if err := update(); err != nil { + d.SetCondition(api.CertsReady, corev1.ConditionFalse, err.Error()) + return nil, err } - if updateController { - klog.Infof("Updating controller driver for deployment %q", d.Name) - objects = append(objects, d.getControllerStatefulSet()) + } else if rop.IsNew() || nop.IsNew() { + // Provision new self-signed certificates if not already present + if err := d.provisionCertificates(); err != nil { + d.SetCondition(api.CertsReady, corev1.ConditionFalse, err.Error()) + return nil, err } - if updateNodeDriver { - klog.Infof("Updating node driver for deployment %q", d.Name) - objects = append(objects, d.getNodeDaemonSet()) + + if err := update(); err != nil { + d.SetCondition(api.CertsReady, corev1.ConditionFalse, err.Error()) + return nil, err } } - for _, obj := range objects { - // Services needs special treatment as they have some immutable field(s) - // So, we cannot refresh the existing one with new service object. - if s, ok := obj.(*corev1.Service); ok { - existingService := &corev1.Service{ - TypeMeta: s.TypeMeta, - ObjectMeta: s.ObjectMeta, - } - err := r.Get(existingService) - if err != nil { - if !errors.IsNotFound(err) { - return true, err - } - // Create the missing service now - klog.Infof("'%s' service not found, creating new one", s.GetName()) - if err := r.Create(s); err != nil { - return true, err - } - continue - } + d.SetCondition(api.CertsReady, corev1.ConditionTrue, "Driver certificates are available.") + return []*corev1.Secret{rs, ns}, nil +} - existingService.Spec.Ports = []corev1.ServicePort{} - for _, p := range s.Spec.Ports { - existingService.Spec.Ports = append(existingService.Spec.Ports, p) - } - existingService.Spec.Selector = s.Spec.Selector - if len(s.Labels) > 0 { - if existingService.Labels == nil { - existingService.Labels = map[string]string{} - } - for key, value := range s.Labels { - existingService.Labels[key] = value - } - } - klog.Infof("updating service '%s' service ports and selector", s.GetName()) - if err := r.Update(existingService); err != nil { - return true, err - } - } else { - if err := r.UpdateOrCreate(obj); err != nil { - return true, err - } +// redeployNodeDriver deploys the node daemon set and records its status on to CR status. +func (d *PmemCSIDriver) redeployNodeDriver(r *ReconcileDeployment) (*appsv1.DaemonSet, error) { + ds := &appsv1.DaemonSet{ + TypeMeta: metav1.TypeMeta{Kind: "DaemonSet", APIVersion: "apps/v1"}, + ObjectMeta: d.getObjectMeta(d.nodeDriverName(), false), + } + if err := d.getSubObject(r, ds); err != nil { + return nil, err + } + op := NewObjectPatch(ds, ds.DeepCopy()) + d.getNodeDaemonSet(ds) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + + // Update node driver status is status object + status := "NotReady" + reason := "Unknown" + if ds.Status.NumberAvailable == 0 { + reason = "Node daemon set has not started yet." + } else if ds.Status.NumberReady == ds.Status.NumberAvailable { + status = "Ready" + reason = fmt.Sprintf("All %d node driver pod(s) running successfully", ds.Status.NumberAvailable) + } else { + reason = fmt.Sprintf("%d out of %d driver pods are ready", ds.Status.NumberReady, ds.Status.NumberAvailable) + } + d.SetDriverStatus(api.NodeDriver, status, reason) + + return ds, nil +} + +// redeployControllerDriver deploys the controller stateful set and records its status on to CR status. +func (d *PmemCSIDriver) redeployControllerDriver(r *ReconcileDeployment) (*appsv1.StatefulSet, error) { + ss := &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{Kind: "StatefulSet", APIVersion: "apps/v1"}, + ObjectMeta: d.getObjectMeta(d.controllerDriverName(), false), + } + if err := d.getSubObject(r, ss); err != nil { + return nil, err + } + op := NewObjectPatch(ss, ss.DeepCopy()) + d.getControllerStatefulSet(ss) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + + // Update controller status is status object + status := "NotReady" + reason := "Unknown" + if ss.Status.Replicas == 0 { + reason = "Controller stateful set has not started yet." + } else if ss.Status.ReadyReplicas == ss.Status.Replicas { + status = "Ready" + reason = fmt.Sprintf("%d instance(s) of controller driver is running successfully", ss.Status.ReadyReplicas) + } else { + reason = fmt.Sprintf("Waiting for stateful set to be ready: %d of %d replicas are ready", + ss.Status.ReadyReplicas, ss.Status.Replicas) + } + d.SetDriverStatus(api.ControllerDriver, status, reason) + + return ss, nil +} + +func (d *PmemCSIDriver) redeployControllerService(r *ReconcileDeployment) (*corev1.Service, error) { + s := &corev1.Service{ + TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.controllerServiceName(), false), + } + + if err := d.getSubObject(r, s); err != nil { + return nil, err + } + + op := NewObjectPatch(s, s.DeepCopy()) + d.getControllerService(s) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return s, nil + +} + +func (d *PmemCSIDriver) redeployMetricsService(r *ReconcileDeployment) (*corev1.Service, error) { + o := &corev1.Service{ + TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.metricsServiceName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getMetricsService(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployCSIDriver(r *ReconcileDeployment) (*storagev1beta1.CSIDriver, error) { + o := &storagev1beta1.CSIDriver{ + TypeMeta: metav1.TypeMeta{Kind: "CSIDriver", APIVersion: "storage.k8s.io/v1beta1"}, + ObjectMeta: d.getObjectMeta(d.csiDriverName(), true), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getCSIDriver(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployProvisionerRole(r *ReconcileDeployment) (*rbacv1.Role, error) { + o := &rbacv1.Role{ + TypeMeta: metav1.TypeMeta{Kind: "Role", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerRoleName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerRole(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployProvisionerRoleBinding(r *ReconcileDeployment) (*rbacv1.RoleBinding, error) { + o := &rbacv1.RoleBinding{ + TypeMeta: metav1.TypeMeta{Kind: "RoleBinding", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerRoleBindingName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerRoleBinding(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployProvisionerClusterRole(r *ReconcileDeployment) (*rbacv1.ClusterRole, error) { + o := &rbacv1.ClusterRole{ + TypeMeta: metav1.TypeMeta{Kind: "ClusterRole", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerClusterRoleName(), true), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerClusterRole(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil + +} + +func (d *PmemCSIDriver) redeployProvisionerClusterRoleBinding(r *ReconcileDeployment) (*rbacv1.ClusterRoleBinding, error) { + o := &rbacv1.ClusterRoleBinding{ + TypeMeta: metav1.TypeMeta{Kind: "ClusterRoleBinding", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerClusterRoleBindingName(), true), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerClusterRoleBinding(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployServiceAccount(r *ReconcileDeployment) (*corev1.ServiceAccount, error) { + o := &corev1.ServiceAccount{ + TypeMeta: metav1.TypeMeta{Kind: "ServiceAccount", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.serviceAccountName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + /* nothing to customize for service account */ + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +// HandleEvent handles the delete/update events received on sub-objects +func (d *PmemCSIDriver) HandleEvent(meta metav1.Object, obj apiruntime.Object, r *ReconcileDeployment) error { + switch obj.(type) { + case *corev1.Secret: + klog.V(4).Infof("Redeploying driver secrets") + if _, err := d.redeploySecrets(r); err != nil { + return fmt.Errorf("failed to redeploy %q secrets: %v", meta.GetName(), err) + } + case *appsv1.DaemonSet: + klog.V(4).Infof("Redeploying node driver") + org := d.DeepCopy() + if _, err := d.redeployNodeDriver(r); err != nil { + return fmt.Errorf("failed to redeploy node driver: %v", err) + } + if err := r.PatchDeploymentStatus(d.Deployment, client.MergeFrom(org)); err != nil { + return fmt.Errorf("failed to update deployment CR status: %v", err) } + case *appsv1.StatefulSet: + klog.V(4).Infof("Redeploying controller driver") + org := d.DeepCopy() + if _, err := d.redeployControllerDriver(r); err != nil { + return fmt.Errorf("failed to redeploy controller driver: %v", err) + } + if err := r.PatchDeploymentStatus(d.Deployment, client.MergeFrom(org)); err != nil { + return fmt.Errorf("failed to update deployment CR status: %v", err) + } + case *rbacv1.Role: + klog.V(4).Infof("Redeploying provisioner RBAC role: %q", meta.GetName()) + if _, err := d.redeployProvisionerRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q provisioner role: %v", meta.GetName(), err) + } + case *rbacv1.ClusterRole: + klog.V(4).Infof("Redeploying provisioner cluster role: %q", meta.GetName()) + if _, err := d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q cluster role: %v", meta.GetName(), err) + } + case *rbacv1.RoleBinding: + klog.V(4).Infof("Redeploying provisioner role binding") + if _, err := d.redeployProvisionerRoleBinding(r); err != nil { + return fmt.Errorf("failed to redeploy %q role binding: %v", meta.GetName(), err) + } + case *rbacv1.ClusterRoleBinding: + klog.V(4).Infof("Redeploying provisioner cluster role binding: %q", meta.GetName()) + if _, err := d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q cluster role binding: %v", meta.GetName(), err) + } + case *corev1.ServiceAccount: + klog.V(4).Infof("Redeploying service account") + if _, err := d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q service account: %v", meta.GetName(), err) + } + case *corev1.Service: + var err error + klog.V(4).Infof("Redeploying service: %s", meta.GetName()) + if meta.GetName() == d.controllerServiceName() { + _, err = d.redeployControllerService(r) + } else if meta.GetName() == d.metricsServiceName() { + _, err = d.redeployMetricsService(r) + } + if err != nil { + return fmt.Errorf("failed to redeploy %q service: %v", meta.GetName(), err) + } + case *storagev1beta1.CSIDriver: + klog.V(4).Infof("Redeploying redeploy %q CSIDriver", meta.GetName()) + if _, err := d.redeployCSIDriver(r); err != nil { + return fmt.Errorf("failed to redeploy %q CSIDriver: %v", meta.GetName(), err) + } + default: + klog.V(3).Infof("Ignoring event on '%s' of type %T", meta.GetName(), obj) } - return false, nil + return nil } func objectIsObsolete(objList []apiruntime.Object, toFind unstructured.Unstructured) (bool, error) { + klog.V(5).Infof("Checking if %q of type %q is obsolete...", toFind.GetName(), toFind.GetObjectKind().GroupVersionKind()) for i := range objList { metaObj, err := meta.Accessor(objList[i]) if err != nil { @@ -280,6 +595,11 @@ func (d *PmemCSIDriver) isOwnerOf(obj unstructured.Unstructured) bool { } func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects []apiruntime.Object) error { + for _, obj := range newObjects { + metaObj, _ := meta.Accessor(obj) + klog.V(5).Infof("==>%q type %q", metaObj.GetName(), obj.GetObjectKind().GroupVersionKind()) + } + for _, gvk := range AllObjectTypes { list := &unstructured.UnstructuredList{} list.SetGroupVersionKind(gvk) @@ -287,7 +607,7 @@ func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects Namespace: d.namespace, } - klog.Infof("Fetching '%s' list with options: %v", gvk, opts.Namespace) + klog.V(5).Infof("Fetching '%s' list with options: %v", gvk, opts.Namespace) if err := r.client.List(context.TODO(), list, opts); err != nil { return err } @@ -303,6 +623,8 @@ func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects if !obsolete { continue } + klog.V(3).Infof("Deleting %q of type '%s' is obsolete object", obj.GetName(), obj.GetObjectKind().GroupVersionKind()) + o, err := scheme.Scheme.New(obj.GetObjectKind().GroupVersionKind()) if err != nil { return err @@ -322,101 +644,109 @@ func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects return nil } -// getDeploymentObjects returns all objects that are part of a driver deployment. -func (d *PmemCSIDriver) getDeploymentObjects() ([]apiruntime.Object, error) { - objects, err := d.getSecrets() - if err != nil { - return nil, err - } +func (d *PmemCSIDriver) registrySecretName() string { + return d.GetHyphenedName() + "-registry-secrets" +} - objects = append(objects, - d.getCSIDriver(), - d.getControllerServiceAccount(), - d.getControllerProvisionerRole(), - d.getControllerProvisionerRoleBinding(), - d.getControllerProvisionerClusterRole(), - d.getControllerProvisionerClusterRoleBinding(), - d.getControllerService(), - d.getMetricsService(), - d.getControllerStatefulSet(), - d.getNodeDaemonSet(), - ) - - return objects, nil +func (d *PmemCSIDriver) nodeSecretName() string { + return d.GetHyphenedName() + "-node-secrets" } -func (d *PmemCSIDriver) getSecrets() ([]apiruntime.Object, error) { - // Encoded private keys and certificates - caCert := d.Spec.CACert - registryPrKey := d.Spec.RegistryPrivateKey - ncPrKey := d.Spec.NodeControllerPrivateKey - registryCert := d.Spec.RegistryCert - ncCert := d.Spec.NodeControllerCert - - // sanity check - if caCert == nil { - if registryCert != nil || ncCert != nil { - return nil, fmt.Errorf("incomplete deployment configuration: missing root CA certificate by which the provided certificates are signed") - } - } else if registryCert == nil || registryPrKey == nil || ncCert == nil || ncPrKey == nil { - return nil, fmt.Errorf("incomplete deployment configuration: certificates and corresponding private keys must be provided") - } +func (d *PmemCSIDriver) csiDriverName() string { + return d.GetName() +} - if caCert == nil { - var prKey *rsa.PrivateKey +func (d *PmemCSIDriver) controllerServiceName() string { + return d.GetHyphenedName() + "-controller" +} - ca, err := pmemtls.NewCA(nil, nil) - if err != nil { - return nil, fmt.Errorf("failed to initialize CA: %v", err) - } - caCert = ca.EncodedCertificate() +func (d *PmemCSIDriver) metricsServiceName() string { + return d.GetHyphenedName() + "-metrics" +} - if registryPrKey != nil { - prKey, err = pmemtls.DecodeKey(registryPrKey) - } else { - prKey, err = pmemtls.NewPrivateKey() - registryPrKey = pmemtls.EncodeKey(prKey) - } - if err != nil { - return nil, err - } +func (d *PmemCSIDriver) serviceAccountName() string { + return d.GetHyphenedName() + "-controller" +} - cert, err := ca.GenerateCertificate("pmem-registry", prKey.Public()) - if err != nil { - return nil, fmt.Errorf("failed to generate registry certificate: %v", err) - } - registryCert = pmemtls.EncodeCert(cert) +func (d *PmemCSIDriver) provisionerRoleName() string { + return d.GetHyphenedName() + "-external-provisioner-cfg" +} - if ncPrKey == nil { - prKey, err = pmemtls.NewPrivateKey() - ncPrKey = pmemtls.EncodeKey(prKey) - } else { - prKey, err = pmemtls.DecodeKey(ncPrKey) - } - if err != nil { - return nil, err - } +func (d *PmemCSIDriver) provisionerRoleBindingName() string { + return d.GetHyphenedName() + "-csi-provisioner-role-cfg" +} - cert, err = ca.GenerateCertificate("pmem-node-controller", prKey.Public()) - if err != nil { - return nil, err - } - ncCert = pmemtls.EncodeCert(cert) +func (d *PmemCSIDriver) provisionerClusterRoleName() string { + return d.GetHyphenedName() + "-external-provisioner-runner" +} + +func (d *PmemCSIDriver) provisionerClusterRoleBindingName() string { + return d.GetHyphenedName() + "-csi-provisioner-role" +} + +func (d *PmemCSIDriver) nodeDriverName() string { + return d.GetHyphenedName() + "-node" +} + +func (d *PmemCSIDriver) controllerDriverName() string { + return d.GetHyphenedName() + "-controller" +} + +func (d *PmemCSIDriver) getRegistrySecrets(secret *corev1.Secret) { + d.getSecret(secret, "registry-secrets", d.Spec.CACert, d.Spec.RegistryPrivateKey, d.Spec.RegistryCert) +} + +func (d *PmemCSIDriver) getNodeSecrets(secret *corev1.Secret) { + d.getSecret(secret, "node-secrets", d.Spec.CACert, d.Spec.NodeControllerPrivateKey, d.Spec.NodeControllerCert) +} + +func (d *PmemCSIDriver) provisionCertificates() error { + var prKey *rsa.PrivateKey + + klog.Infof("Provisioning new certificates for deployment '%s'", d.Name) + ca, err := pmemtls.NewCA(nil, nil) + if err != nil { + return fmt.Errorf("failed to initialize CA: %v", err) + } + d.Spec.CACert = ca.EncodedCertificate() + + if d.Spec.RegistryPrivateKey != nil { + prKey, err = pmemtls.DecodeKey(d.Spec.RegistryPrivateKey) } else { - // check if the provided certificates are valid - if err := validateCertificates(caCert, registryPrKey, registryCert, ncPrKey, ncCert); err != nil { - return nil, fmt.Errorf("validate CA certificates: %v", err) - } + prKey, err = pmemtls.NewPrivateKey() + d.Spec.RegistryPrivateKey = pmemtls.EncodeKey(prKey) + } + if err != nil { + return err + } + + cert, err := ca.GenerateCertificate("pmem-registry", prKey.Public()) + if err != nil { + return fmt.Errorf("failed to generate registry certificate: %v", err) } + d.Spec.RegistryCert = pmemtls.EncodeCert(cert) + + if d.Spec.NodeControllerPrivateKey == nil { + prKey, err = pmemtls.NewPrivateKey() + d.Spec.NodeControllerPrivateKey = pmemtls.EncodeKey(prKey) + } else { + prKey, err = pmemtls.DecodeKey(d.Spec.NodeControllerPrivateKey) + } + if err != nil { + return err + } + + cert, err = ca.GenerateCertificate("pmem-node-controller", prKey.Public()) + if err != nil { + return err + } + d.Spec.NodeControllerCert = pmemtls.EncodeCert(cert) // Instead of waiting for next GC cycle, initiate garbage collector manually // so that the unneeded CA key, certificate get removed. defer runtime.GC() - return []apiruntime.Object{ - d.getSecret("registry-secrets", caCert, registryPrKey, registryCert), - d.getSecret("node-secrets", caCert, ncPrKey, ncCert), - }, nil + return nil } // validateCertificates ensures that the given keys and certificates are valid @@ -424,7 +754,7 @@ func (d *PmemCSIDriver) getSecrets() ([]apiruntime.Object, error) { // a tls client connection to that sever using the provided keys and certificates. // As we use mutual-tls, testing one server is enough to make sure that the provided // certificates works -func validateCertificates(caCert, regKey, regCert, ncKey, ncCert []byte) error { +func (d *PmemCSIDriver) validateCertificates() error { tmp, err := ioutil.TempDir("", "pmem-csi-validate-certs-*") if err != nil { return err @@ -432,12 +762,12 @@ func validateCertificates(caCert, regKey, regCert, ncKey, ncCert []byte) error { defer os.RemoveAll(tmp) // Registry server config - regCfg, err := pmemgrpc.ServerTLS(caCert, regCert, regKey, "pmem-node-controller") + regCfg, err := pmemgrpc.ServerTLS(d.Spec.CACert, d.Spec.RegistryCert, d.Spec.RegistryPrivateKey, "pmem-node-controller") if err != nil { return err } - clientCfg, err := pmemgrpc.ClientTLS(caCert, ncCert, ncKey, "pmem-registry") + clientCfg, err := pmemgrpc.ClientTLS(d.Spec.CACert, d.Spec.NodeControllerCert, d.Spec.NodeControllerPrivateKey, "pmem-registry") if err != nil { return err } @@ -460,20 +790,13 @@ func validateCertificates(caCert, regKey, regCert, ncKey, ncCert []byte) error { return nil } -func (d *PmemCSIDriver) getCSIDriver() *storagev1beta1.CSIDriver { +func (d *PmemCSIDriver) getCSIDriver(csiDriver *storagev1beta1.CSIDriver) { attachRequired := false podInfoOnMount := true - csiDriver := &storagev1beta1.CSIDriver{ - TypeMeta: metav1.TypeMeta{ - Kind: "CSIDriver", - APIVersion: "storage.k8s.io/v1beta1", - }, - ObjectMeta: d.getObjectMeta(d.GetName(), true), - Spec: storagev1beta1.CSIDriverSpec{ - AttachRequired: &attachRequired, - PodInfoOnMount: &podInfoOnMount, - }, + csiDriver.Spec = storagev1beta1.CSIDriverSpec{ + AttachRequired: &attachRequired, + PodInfoOnMount: &podInfoOnMount, } // Volume lifecycle modes are supported only after k8s v1.16 @@ -483,419 +806,331 @@ func (d *PmemCSIDriver) getCSIDriver() *storagev1beta1.CSIDriver { storagev1beta1.VolumeLifecycleEphemeral, } } - - return csiDriver } -func (d *PmemCSIDriver) getSecret(cn string, ca, encodedKey, encodedCert []byte) *corev1.Secret { - return &corev1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-"+cn, false), - Type: corev1.SecretTypeTLS, - Data: map[string][]byte{ - // Same names as in the example secrets and in the v1 API. - "ca.crt": ca, // no standard name for this one - "tls.key": encodedKey, // v1.TLSPrivateKeyKey - "tls.crt": encodedCert, // v1.TLSCertKey - }, +func (d *PmemCSIDriver) getSecret(secret *corev1.Secret, cn string, ca, encodedKey, encodedCert []byte) { + secret.Type = corev1.SecretTypeTLS + secret.Data = map[string][]byte{ + // Same names as in the example secrets and in the v1 API. + "ca.crt": ca, // no standard name for this one + "tls.key": encodedKey, // v1.TLSPrivateKeyKey + "tls.crt": encodedCert, // v1.TLSCertKey } } -func (d *PmemCSIDriver) getControllerService() *corev1.Service { - return &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-controller", false), - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeClusterIP, - Ports: []corev1.ServicePort{ - corev1.ServicePort{ - Port: controllerServicePort, - TargetPort: intstr.IntOrString{ - IntVal: controllerServicePort, - }, - }, - }, - Selector: map[string]string{ - "app": d.GetHyphenedName() + "-controller", - }, - }, +func (d *PmemCSIDriver) getService(service *corev1.Service, t corev1.ServiceType, port int32) { + service.Spec.Type = t + if service.Spec.Ports == nil { + service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{}) + } + service.Spec.Ports[0].Port = port + service.Spec.Ports[0].TargetPort = intstr.IntOrString{ + IntVal: port, + } + service.Spec.Selector = map[string]string{ + "app": d.GetHyphenedName() + "-controller", } } -func (d *PmemCSIDriver) getMetricsService() *corev1.Service { - return &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-metrics", false), - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeNodePort, - Ports: []corev1.ServicePort{ - corev1.ServicePort{ - Port: controllerMetricsPort, - TargetPort: intstr.IntOrString{ - IntVal: controllerMetricsPort, - }, - }, - }, - Selector: map[string]string{ - "app": d.GetHyphenedName() + "-controller", - }, - }, - } +func (d *PmemCSIDriver) getControllerService(service *corev1.Service) { + d.getService(service, corev1.ServiceTypeClusterIP, controllerServicePort) } -func (d *PmemCSIDriver) getControllerServiceAccount() *corev1.ServiceAccount { - return &corev1.ServiceAccount{ - TypeMeta: metav1.TypeMeta{ - Kind: "ServiceAccount", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-controller", false), - } +func (d *PmemCSIDriver) getMetricsService(service *corev1.Service) { + d.getService(service, corev1.ServiceTypeNodePort, controllerMetricsPort) } -func (d *PmemCSIDriver) getControllerProvisionerRole() *rbacv1.Role { - return &rbacv1.Role{ - TypeMeta: metav1.TypeMeta{ - Kind: "Role", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-external-provisioner-cfg", false), - Rules: []rbacv1.PolicyRule{ - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"endpoints"}, - Verbs: []string{ - "get", "watch", "list", "delete", "update", "create", - }, +func (d *PmemCSIDriver) getControllerProvisionerRole(role *rbacv1.Role) { + role.Rules = []rbacv1.PolicyRule{ + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"endpoints"}, + Verbs: []string{ + "get", "watch", "list", "delete", "update", "create", }, - rbacv1.PolicyRule{ - APIGroups: []string{"coordination.k8s.io"}, - Resources: []string{"leases"}, - Verbs: []string{ - "get", "watch", "list", "delete", "update", "create", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"coordination.k8s.io"}, + Resources: []string{"leases"}, + Verbs: []string{ + "get", "watch", "list", "delete", "update", "create", }, }, } } -func (d *PmemCSIDriver) getControllerProvisionerRoleBinding() *rbacv1.RoleBinding { - return &rbacv1.RoleBinding{ - TypeMeta: metav1.TypeMeta{ - Kind: "RoleBinding", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-csi-provisioner-role-cfg", false), - Subjects: []rbacv1.Subject{ - { - Kind: "ServiceAccount", - Name: d.GetHyphenedName() + "-controller", - Namespace: d.namespace, - }, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "Role", - Name: d.GetHyphenedName() + "-external-provisioner-cfg", +func (d *PmemCSIDriver) getControllerProvisionerRoleBinding(rb *rbacv1.RoleBinding) { + rb.Subjects = []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: d.GetHyphenedName() + "-controller", + Namespace: d.namespace, }, } + rb.RoleRef = rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: d.GetHyphenedName() + "-external-provisioner-cfg", + } } -func (d *PmemCSIDriver) getControllerProvisionerClusterRole() *rbacv1.ClusterRole { - return &rbacv1.ClusterRole{ - TypeMeta: metav1.TypeMeta{ - Kind: "ClusterRole", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-external-provisioner-runner", true), - Rules: []rbacv1.PolicyRule{ - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"persistentvolumes"}, - Verbs: []string{ - "get", "watch", "list", "delete", "create", - }, +func (d *PmemCSIDriver) getControllerProvisionerClusterRole(cr *rbacv1.ClusterRole) { + cr.Rules = []rbacv1.PolicyRule{ + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"persistentvolumes"}, + Verbs: []string{ + "get", "watch", "list", "delete", "create", }, - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"persistentvolumeclaims"}, - Verbs: []string{ - "get", "watch", "list", "update", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"persistentvolumeclaims"}, + Verbs: []string{ + "get", "watch", "list", "update", }, - rbacv1.PolicyRule{ - APIGroups: []string{"storage.k8s.io"}, - Resources: []string{"storageclasses"}, - Verbs: []string{ - "get", "watch", "list", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"storage.k8s.io"}, + Resources: []string{"storageclasses"}, + Verbs: []string{ + "get", "watch", "list", }, - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"events"}, - Verbs: []string{ - "watch", "list", "create", "update", "patch", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{ + "watch", "list", "create", "update", "patch", }, - rbacv1.PolicyRule{ - APIGroups: []string{"snapshot.storage.k8s.io"}, - Resources: []string{"volumesnapshots", "volumesnapshotcontents"}, - Verbs: []string{ - "get", "list", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"snapshot.storage.k8s.io"}, + Resources: []string{"volumesnapshots", "volumesnapshotcontents"}, + Verbs: []string{ + "get", "list", }, - rbacv1.PolicyRule{ - APIGroups: []string{"storage.k8s.io"}, - Resources: []string{"csinodes"}, - Verbs: []string{ - "get", "list", "watch", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"storage.k8s.io"}, + Resources: []string{"csinodes"}, + Verbs: []string{ + "get", "list", "watch", }, - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"nodes"}, - Verbs: []string{ - "get", "list", "watch", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{ + "get", "list", "watch", }, }, } } -func (d *PmemCSIDriver) getControllerProvisionerClusterRoleBinding() *rbacv1.ClusterRoleBinding { - return &rbacv1.ClusterRoleBinding{ - TypeMeta: metav1.TypeMeta{ - Kind: "ClusterRoleBinding", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-csi-provisioner-role", true), - Subjects: []rbacv1.Subject{ - { - Kind: "ServiceAccount", - Name: d.GetHyphenedName() + "-controller", - Namespace: d.namespace, - }, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "ClusterRole", - Name: d.GetHyphenedName() + "-external-provisioner-runner", +func (d *PmemCSIDriver) getControllerProvisionerClusterRoleBinding(crb *rbacv1.ClusterRoleBinding) { + crb.Subjects = []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: d.serviceAccountName(), // GetHyphenedName() + "-controller", + Namespace: d.namespace, }, } + crb.RoleRef = rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: d.provisionerClusterRoleName(), // d.GetHyphenedName() + "-external-provisioner-runner", + } } -func (d *PmemCSIDriver) getControllerStatefulSet() *appsv1.StatefulSet { +func (d *PmemCSIDriver) getControllerStatefulSet(ss *appsv1.StatefulSet) { replicas := int32(1) true := true pmemcsiUser := int64(1000) - ss := &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", + + ss.Spec = appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.GetHyphenedName() + "-controller", + }, }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-controller", false), - Spec: appsv1.StatefulSetSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": d.GetHyphenedName() + "-controller", + ServiceName: d.GetHyphenedName() + "-controller", + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: joinMaps( + d.Spec.Labels, + map[string]string{ + "app": d.GetHyphenedName() + "-controller", + "pmem-csi.intel.com/webhook": "ignore", + }), + Annotations: map[string]string{ + "pmem-csi.intel.com/scrape": "containers", }, }, - ServiceName: d.GetHyphenedName() + "-controller", - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: joinMaps( - d.Spec.Labels, - map[string]string{ - "app": d.GetHyphenedName() + "-controller", - "pmem-csi.intel.com/webhook": "ignore", - }), - Annotations: map[string]string{ - "pmem-csi.intel.com/scrape": "containers", - }, + Spec: corev1.PodSpec{ + SecurityContext: &corev1.PodSecurityContext{ + // Controller pod must run as non-root user + RunAsNonRoot: &true, + RunAsUser: &pmemcsiUser, }, - Spec: corev1.PodSpec{ - SecurityContext: &corev1.PodSecurityContext{ - // Controller pod must run as non-root user - RunAsNonRoot: &true, - RunAsUser: &pmemcsiUser, - }, - ServiceAccountName: d.GetHyphenedName() + "-controller", - Containers: []corev1.Container{ - d.getControllerContainer(), - d.getProvisionerContainer(), - }, - Affinity: &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - // By default, the controller will run anywhere in the cluster. - // If that isn't desired, the "pmem-csi.intel.com/controller" label - // can be set to "no" or "false" for a node to prevent the controller - // from running there. - // - // This is used during testing as a workaround for a particular issue - // on Clear Linux where network configuration randomly fails such that - // the driver which runs on the same node as the controller cannot - // connect to the controller (https://github.com/intel/pmem-csi/issues/555). - // - // It may also be useful for other purposes, in particular for deployment - // through the operator: it has the same rule and currently no other API for - // setting affinity. - { - Key: "pmem-csi.intel.com/controller", - Operator: corev1.NodeSelectorOpNotIn, - Values: []string{"no", "false"}, - }, + ServiceAccountName: d.GetHyphenedName() + "-controller", + Containers: []corev1.Container{ + d.getControllerContainer(), + d.getProvisionerContainer(), + }, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + // By default, the controller will run anywhere in the cluster. + // If that isn't desired, the "pmem-csi.intel.com/controller" label + // can be set to "no" or "false" for a node to prevent the controller + // from running there. + // + // This is used during testing as a workaround for a particular issue + // on Clear Linux where network configuration randomly fails such that + // the driver which runs on the same node as the controller cannot + // connect to the controller (https://github.com/intel/pmem-csi/issues/555). + // + // It may also be useful for other purposes, in particular for deployment + // through the operator: it has the same rule and currently no other API for + // setting affinity. + { + Key: "pmem-csi.intel.com/controller", + Operator: corev1.NodeSelectorOpNotIn, + Values: []string{"no", "false"}, }, }, }, }, }, }, - Volumes: []corev1.Volume{ - { - Name: "plugin-socket-dir", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, + }, + Volumes: []corev1.Volume{ + { + Name: "plugin-socket-dir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, - { - Name: "registry-cert", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: d.GetHyphenedName() + "-registry-secrets", - }, + }, + { + Name: "registry-cert", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: d.GetHyphenedName() + "-registry-secrets", }, }, - { - Name: "tmp-dir", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, + }, + { + Name: "tmp-dir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, }, }, }, } - - return ss } -func (d *PmemCSIDriver) getNodeDaemonSet() *appsv1.DaemonSet { +func (d *PmemCSIDriver) getNodeDaemonSet(ds *appsv1.DaemonSet) { directoryOrCreate := corev1.HostPathDirectoryOrCreate - ds := &appsv1.DaemonSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "DaemonSet", - APIVersion: "apps/v1", + ds.Spec = appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.GetHyphenedName() + "-node", + }, }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-node", false), - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": d.GetHyphenedName() + "-node", + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: joinMaps( + d.Spec.Labels, + map[string]string{ + "app": d.GetHyphenedName() + "-node", + "pmem-csi.intel.com/webhook": "ignore", + }), + Annotations: map[string]string{ + "pmem-csi.intel.com/scrape": "containers", }, }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: joinMaps( - d.Spec.Labels, - map[string]string{ - "app": d.GetHyphenedName() + "-node", - "pmem-csi.intel.com/webhook": "ignore", - }), - Annotations: map[string]string{ - "pmem-csi.intel.com/scrape": "containers", - }, + Spec: corev1.PodSpec{ + NodeSelector: d.Spec.NodeSelector, + Containers: []corev1.Container{ + d.getNodeDriverContainer(), + d.getNodeRegistrarContainer(), }, - Spec: corev1.PodSpec{ - NodeSelector: d.Spec.NodeSelector, - Containers: []corev1.Container{ - d.getNodeDriverContainer(), - d.getNodeRegistrarContainer(), - }, - Volumes: []corev1.Volume{ - { - Name: "socket-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/plugins/" + d.GetName(), - Type: &directoryOrCreate, - }, + Volumes: []corev1.Volume{ + { + Name: "socket-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/plugins/" + d.GetName(), + Type: &directoryOrCreate, }, }, - { - Name: "registration-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/plugins_registry/", - Type: &directoryOrCreate, - }, + }, + { + Name: "registration-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/plugins_registry/", + Type: &directoryOrCreate, }, }, - { - Name: "mountpoint-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/plugins/kubernetes.io/csi", - Type: &directoryOrCreate, - }, + }, + { + Name: "mountpoint-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/plugins/kubernetes.io/csi", + Type: &directoryOrCreate, }, }, - { - Name: "pods-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/pods", - Type: &directoryOrCreate, - }, + }, + { + Name: "pods-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/pods", + Type: &directoryOrCreate, }, }, - { - Name: "node-cert", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: d.GetHyphenedName() + "-node-secrets", - }, + }, + { + Name: "node-cert", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: d.GetHyphenedName() + "-node-secrets", }, }, - { - Name: "pmem-state-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/var/lib/" + d.GetName(), - Type: &directoryOrCreate, - }, + }, + { + Name: "pmem-state-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/var/lib/" + d.GetName(), + Type: &directoryOrCreate, }, }, - { - Name: "dev-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/dev", - Type: &directoryOrCreate, - }, + }, + { + Name: "dev-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/dev", + Type: &directoryOrCreate, }, }, - { - Name: "sys-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/sys", - Type: &directoryOrCreate, - }, + }, + { + Name: "sys-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/sys", + Type: &directoryOrCreate, }, }, }, @@ -903,8 +1138,6 @@ func (d *PmemCSIDriver) getNodeDaemonSet() *appsv1.DaemonSet { }, }, } - - return ds } func (d *PmemCSIDriver) getControllerCommand() []string { @@ -1143,6 +1376,7 @@ func (d *PmemCSIDriver) getMetricsPorts(port int32) []corev1.ContainerPort { { Name: "metrics", ContainerPort: port, + Protocol: "TCP", }, } } @@ -1153,7 +1387,6 @@ func (d *PmemCSIDriver) getObjectMeta(name string, isClusterResource bool) metav OwnerReferences: []metav1.OwnerReference{ d.GetOwnerReference(), }, - Labels: d.Spec.Labels, } if !isClusterResource { meta.Namespace = d.namespace diff --git a/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go b/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go index 879d4a8ed7..0e3cbd7cf0 100644 --- a/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go +++ b/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go @@ -10,18 +10,22 @@ import ( "context" "fmt" "os" - "reflect" + "sync" "time" pmemcsiv1alpha1 "github.com/intel/pmem-csi/pkg/apis/pmemcsi/v1alpha1" "github.com/intel/pmem-csi/pkg/k8sutil" pmemcontroller "github.com/intel/pmem-csi/pkg/pmem-csi-operator/controller" "github.com/intel/pmem-csi/pkg/version" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -29,8 +33,10 @@ import ( "k8s.io/kubectl/pkg/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -47,11 +53,11 @@ func Add(mgr manager.Manager, opts pmemcontroller.ControllerOptions) error { if err != nil { return err } - return add(mgr, r) + return add(mgr, r.(*ReconcileDeployment)) } // add adds a new Controller to mgr with r as the reconcile.Reconciler -func add(mgr manager.Manager, r reconcile.Reconciler) error { +func add(mgr manager.Manager, r *ReconcileDeployment) error { // Create a new controller c, err := controller.New("deployment-controller", mgr, controller.Options{Reconciler: r}) if err != nil { @@ -59,13 +65,97 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } + p := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + klog.V(4).Infof("Got deployment update event on: %s(Type: %T)", e.MetaOld.GetName(), e.MetaOld) + old := e.ObjectOld.(*pmemcsiv1alpha1.Deployment) + new := e.ObjectNew.(*pmemcsiv1alpha1.Deployment) + if old.GetGeneration() == new.GetGeneration() { + // No changes registered + return false + } + // We are intersted in only spec changes, not CR status changes + changes := old.Compare(new) + klog.Infof("CR changes: %v", changes) + return len(changes) != 0 + }, + DeleteFunc: func(e event.DeleteEvent) bool { + klog.Infof("Deployment '%s' deleted, removing local reference", e.Meta.GetName()) + // Deployment CR deleted, remove it's reference from cache. + // Objects owned by it are automatically garbage collected. + r.deleteDeployment(e.Meta.GetName()) + // We already handled the event here, + // so no more further reconcile required. + return false + }, + } + // Watch for changes to primary resource Deployment - err = c.Watch(&source.Kind{Type: &pmemcsiv1alpha1.Deployment{}}, &handler.EnqueueRequestForObject{}) - if err != nil { + if err := c.Watch(&source.Kind{Type: &pmemcsiv1alpha1.Deployment{}}, &handler.EnqueueRequestForObject{}, p); err != nil { klog.Errorf("Deployment.Add: watch error: %v", err) return err } + // Predicated functions for sub-object changes + // We do not want to pollute deployment reconcile loop by sending + // sub-object changes, instead we could handle them individually. + // So all these event handlers returns 'false' so that the event + // is not propagated further. + // One exception is: If we fail to handle here, then we pass this + // event to reconcile loop, where it should recognize these requests + // and just requeue. Expecting that the failure is retried. + eventFunc := func(event string, meta metav1.Object, obj apiruntime.Object) bool { + // Get the owned deployment + d, err := r.getDeploymentFor(meta) + if err != nil { + // The owner might have deleted already + // we can safely ignore this event + return false + } + if err := d.HandleEvent(meta, obj, r); err != nil { + klog.Warningf("Error occurred while handling %s event: %v", event, err) + return true + } + return false + } + sop := predicate.Funcs{ + DeleteFunc: func(e event.DeleteEvent) bool { + klog.V(4).Infof("'%s' of type %T deleted", e.Meta.GetName(), e.Object) + return eventFunc("delete", e.Meta, e.Object) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + klog.V(4).Infof("'%s' of type %T updated", e.MetaOld.GetName(), e.ObjectOld) + return eventFunc("update", e.MetaOld, e.ObjectOld) + }, + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + } + + // All possible object types for a Deployment CR + // that would requires recdeploying on change + subresources := []runtime.Object{ + &corev1.Secret{}, + &rbacv1.Role{}, + &rbacv1.RoleBinding{}, + &rbacv1.ClusterRole{}, + &rbacv1.ClusterRoleBinding{}, + &corev1.Service{}, + &storagev1beta1.CSIDriver{}, + &appsv1.StatefulSet{}, + &appsv1.DaemonSet{}, + } + + for _, resource := range subresources { + if err := c.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &pmemcsiv1alpha1.Deployment{}, + }, sop); err != nil { + klog.Errorf("Deployment.Add: watch error: %v", err) + return err + } + } + return nil } @@ -85,8 +175,9 @@ type ReconcileDeployment struct { // container image used for deploying the operator containerImage string // known deployments - deployments map[string]*pmemcsiv1alpha1.Deployment - reconcileHooks map[ReconcileHook]struct{} + deployments map[string]*pmemcsiv1alpha1.Deployment + deploymentsMutex sync.Mutex + reconcileHooks map[ReconcileHook]struct{} } // NewReconcileDeployment creates new deployment reconciler @@ -117,14 +208,15 @@ func NewReconcileDeployment(client client.Client, opts pmemcontroller.Controller evRecorder := evBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "pmem-csi-operator"}) return &ReconcileDeployment{ - client: client, - evBroadcaster: evBroadcaster, - evRecorder: evRecorder, - k8sVersion: opts.K8sVersion, - namespace: opts.Namespace, - containerImage: opts.DriverImage, - deployments: map[string]*pmemcsiv1alpha1.Deployment{}, - reconcileHooks: map[ReconcileHook]struct{}{}, + client: client, + evBroadcaster: evBroadcaster, + evRecorder: evRecorder, + k8sVersion: opts.K8sVersion, + namespace: opts.Namespace, + containerImage: opts.DriverImage, + deployments: map[string]*pmemcsiv1alpha1.Deployment{}, + deploymentsMutex: sync.Mutex{}, + reconcileHooks: map[ReconcileHook]struct{}{}, }, nil } @@ -137,30 +229,20 @@ func (r *ReconcileDeployment) Reconcile(request reconcile.Request) (reconcile.Re var requeue bool var err error - requeueDelay := 1 * time.Minute requeueDelayOnError := 2 * time.Minute // Fetch the Deployment instance deployment := &pmemcsiv1alpha1.Deployment{} err = r.client.Get(context.TODO(), request.NamespacedName, deployment) if err != nil { - if errors.IsNotFound(err) { - // Request object not found, could have been deleted after reconcile request. - // Owned objects are automatically garbage collected. - // Remove the reference from our records - klog.Infof("Deployment '%s' deleted, removing local reference", request.Name) - delete(r.deployments, request.Name) - return reconcile.Result{}, nil - } - // Error reading the object - requeue the request. + klog.V(3).Infof("Failed to retrieve object '%s' to reconcile", request.Name) + // One reason for this could be a failed predicate event handler of + // sub-objects. So requeue the request so that the same predicate + // handle could be called on that object. return reconcile.Result{Requeue: requeue, RequeueAfter: requeueDelayOnError}, err } - for f := range r.reconcileHooks { - if f != nil { - (*f)(deployment) - } - } + klog.Infof("Reconciling deployment %q", deployment.GetName()) // If the deployment has already been marked for deletion, // then we don't need to do anything for it because the @@ -170,37 +252,51 @@ func (r *ReconcileDeployment) Reconcile(request reconcile.Request) (reconcile.Re return reconcile.Result{Requeue: false}, nil } - patch := client.MergeFrom(deployment.DeepCopy()) - d := &PmemCSIDriver{deployment, r.namespace, r.k8sVersion} + for f := range r.reconcileHooks { + if f != nil { + (*f)(deployment) + } + } + + if deployment.Status.Phase == pmemcsiv1alpha1.DeploymentPhaseNew { + /* New deployment */ + r.evRecorder.Event(deployment, corev1.EventTypeNormal, pmemcsiv1alpha1.EventReasonNew, "Processing new driver deployment") + } + + // Cache the deployment + r.saveDeployment(deployment) + + dep := deployment.DeepCopy() // update status defer func() { - klog.Infof("Updating deployment status....") - d.Deployment.Status.LastUpdated = metav1.Now() - // Passing copy of CR to patch as the fake client used in tests - // will write back the changes to both status and spec. - copy := d.Deployment.DeepCopy() - if statusErr := r.client.Status().Patch(context.TODO(), copy, patch); statusErr != nil { + klog.Infof("Updating deployment status...") + // Some clients(fake client used in tests) do not support patching status reliably + // and updates even spec changes. So, revert any spec changes(like deployment defaults) we made. + // Revert back spec changes, those are not supposed to get saved on the API server. + deployment.Spec.DeepCopyInto(&dep.Spec) + + if err := r.PatchDeploymentStatus(dep, client.MergeFrom(deployment.DeepCopy())); err != nil { klog.Warningf("failed to update status %q for deployment %q: %v", - d.Deployment.Status.Phase, d.Name, statusErr) + dep.Status.Phase, dep.Name, err) } - d.Deployment.Status = copy.Status - }() - requeue, err = d.Reconcile(r) + klog.Infof("End of reconcile.") + }() - klog.Infof("Requeue: %t, error: %v", requeue, err) + d := &PmemCSIDriver{dep, r.namespace, r.k8sVersion} + if err := d.Reconcile(r); err != nil { + klog.Infof("Reconcile error: %v", err) + dep.Status.Phase = pmemcsiv1alpha1.DeploymentPhaseFailed + r.evRecorder.Event(dep, corev1.EventTypeWarning, pmemcsiv1alpha1.EventReasonFailed, err.Error()) - if !requeue { - return reconcile.Result{}, err + return reconcile.Result{Requeue: true, RequeueAfter: requeueDelayOnError}, err } - delay := requeueDelay - if err != nil { - delay = requeueDelayOnError - } + dep.Status.Phase = pmemcsiv1alpha1.DeploymentPhaseRunning + r.evRecorder.Event(dep, corev1.EventTypeNormal, pmemcsiv1alpha1.EventReasonRunning, "Driver deployment successful") - return reconcile.Result{Requeue: requeue, RequeueAfter: delay}, err + return reconcile.Result{}, nil } func (r *ReconcileDeployment) Namespace() string { @@ -303,6 +399,64 @@ func (r *ReconcileDeployment) Delete(obj runtime.Object) error { return r.client.Delete(context.TODO(), obj) } +// PatchDeploymentStatus patches the give given deployment CR status +func (r *ReconcileDeployment) PatchDeploymentStatus(dep *pmemcsiv1alpha1.Deployment, patch client.Patch) error { + dep.Status.LastUpdated = metav1.Now() + // Passing a copy of CR to patch as the fake client used in tests + // will write back the changes to both status and spec. + if err := r.client.Status().Patch(context.TODO(), dep.DeepCopy(), patch); err != nil { + return err + } + // update the status of cached deployment + r.cacheDeploymentStatus(dep.GetName(), dep.Status) + return nil +} + +func (r *ReconcileDeployment) saveDeployment(d *pmemcsiv1alpha1.Deployment) { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + r.deployments[d.Name] = d +} + +func (r *ReconcileDeployment) getDeployment(name string) *pmemcsiv1alpha1.Deployment { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + return r.deployments[name] +} + +func (r *ReconcileDeployment) deleteDeployment(name string) { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + delete(r.deployments, name) +} + +func (r *ReconcileDeployment) cacheDeploymentStatus(name string, status pmemcsiv1alpha1.DeploymentStatus) { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + if d, ok := r.deployments[name]; ok { + status.DeepCopyInto(&d.Status) + } +} + +func (r *ReconcileDeployment) getDeploymentFor(obj metav1.Object) (*PmemCSIDriver, error) { + //klog.Infof("MapRequest: %s(%T)", obj.Meta.GetName(), obj.Meta) + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + for name, d := range r.deployments { + selfRef := d.GetOwnerReference() + if isOwnedBy(obj, &selfRef) { + klog.Infof("Object '%s' is owned by the %q deployment", obj.GetName(), name) + deployment := d.DeepCopy() + if err := deployment.EnsureDefaults(r.containerImage); err != nil { + return nil, err + } + return &PmemCSIDriver{deployment, r.namespace, r.k8sVersion}, nil + } + } + + return nil, fmt.Errorf("Not found") +} + // containerImage returns container image name used by operator Pod func containerImage(cs *kubernetes.Clientset, namespace string) (string, error) { const podNameEnv = "POD_NAME" @@ -339,7 +493,7 @@ func containerImage(cs *kubernetes.Clientset, namespace string) (string, error) // isOwnedBy checks if expectedOwner is in the object's owner references list func isOwnedBy(object metav1.Object, expectedOwner *metav1.OwnerReference) bool { for _, owner := range object.GetOwnerReferences() { - if reflect.DeepEqual(&owner, expectedOwner) { + if owner.UID == expectedOwner.UID { return true } }