diff --git a/pkg/apis/pmemcsi/v1alpha1/deployment_types.go b/pkg/apis/pmemcsi/v1alpha1/deployment_types.go index e1f8bb0f3c..04e8ff81fc 100644 --- a/pkg/apis/pmemcsi/v1alpha1/deployment_types.go +++ b/pkg/apis/pmemcsi/v1alpha1/deployment_types.go @@ -55,6 +55,7 @@ const ( // Related issue : https://github.com/kubernetes-sigs/controller-tools/issues/478 // Fails setting min/max for integers: https://github.com/helm/helm/issues/5806 +// +k8s:deepcopy-gen=true // DeploymentSpec defines the desired state of Deployment type DeploymentSpec struct { // Important: Run "make operator-generate-k8s" to regenerate code after modifying this file @@ -109,6 +110,66 @@ type DeploymentSpec struct { KubeletDir string `json:"kubeletDir,omitempty"` } +// DeploymentConditionType type for representing a deployment status condition +type DeploymentConditionType string + +const ( + // CertsVerified means the provided deployment secrets are verified and valid for usage + CertsVerified DeploymentConditionType = "CertsVerified" + // CertsReady means secrests/certificates required for running the PMEM-CSI driver + // are ready and the deployment could progress further + CertsReady DeploymentConditionType = "CertsReady" + // DriverDeployed means that the all the sub-resources required for the deployment CR + // got created + DriverDeployed DeploymentConditionType = "DriverDeployed" +) + +// +k8s:deepcopy-gen=true +type DeploymentCondition struct { + // Type of condition. + Type DeploymentConditionType `json:"type"` + // Status of the condition, one of True, False, Unknown. + Status corev1.ConditionStatus `json:"status"` + // Message human readable text that explain why this condition is in this state + // +optional + Reason string `json:"reason,omitempty"` + // Last time the condition was probed. + // +optional + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` +} + +type DriverType int + +const ( + ControllerDriver DriverType = iota + NodeDriver +) + +func (t DriverType) String() string { + switch t { + case ControllerDriver: + return "controller" + case NodeDriver: + return "node" + } + return "" +} + +// +k8s:deepcopy-gen=true +type DriverStatus struct { + // Type represents type of the driver: controller or node + Type string `json:"type"` + // Status represents the driver status : Ready, NotReady + Status string `json:"status"` + // Reason represents the human readable text that explains why the + // driver is in this state. + Reason string `json:"reason"` + // LastUpdated time of the driver status + LastUpdated metav1.Time `json:"lastUpdated,omitempty"` +} + +// +k8s:deepcopy-gen=true + // DeploymentStatus defines the observed state of Deployment type DeploymentStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster @@ -116,6 +177,9 @@ type DeploymentStatus struct { // Phase indicates the state of the deployment Phase DeploymentPhase `json:"phase,omitempty"` + // Conditions + Conditions []DeploymentCondition `json:"conditions,omitempty"` + Components []DriverStatus `json:"driverComponents,omitempty"` // LastUpdated time of the deployment status LastUpdated metav1.Time `json:"lastUpdated,omitempty"` } @@ -256,6 +320,35 @@ func (c DeploymentChange) String() string { }[c] } +func (d *Deployment) SetCondition(t DeploymentConditionType, state corev1.ConditionStatus, reason string) { + for _, c := range d.Status.Conditions { + if c.Type == t { + c.Status = state + c.Reason = reason + c.LastUpdateTime = metav1.Now() + return + } + } + d.Status.Conditions = append(d.Status.Conditions, DeploymentCondition{ + Type: t, + Status: state, + Reason: reason, + LastUpdateTime: metav1.Now(), + }) +} + +func (d *Deployment) SetDriverStatus(t DriverType, status, reason string) { + if d.Status.Components == nil { + d.Status.Components = make([]DriverStatus, 2) + } + d.Status.Components[t] = DriverStatus{ + Type: t.String(), + Status: status, + Reason: reason, + LastUpdated: metav1.Now(), + } +} + // EnsureDefaults make sure that the deployment object has all defaults set properly func (d *Deployment) EnsureDefaults(operatorImage string) error { if d.Spec.Image == "" { @@ -420,6 +513,30 @@ func (d *Deployment) GetOwnerReference() metav1.OwnerReference { } } +// HaveCertificatesConfigured checks if the configured deployment +// certificate fields are valid. Returns true if valid else appropriate +// error. +func (d *Deployment) HaveCertificatesConfigured() (bool, error) { + // Encoded private keys and certificates + caCert := d.Spec.CACert + registryPrKey := d.Spec.RegistryPrivateKey + ncPrKey := d.Spec.NodeControllerPrivateKey + registryCert := d.Spec.RegistryCert + ncCert := d.Spec.NodeControllerCert + + // sanity check + if caCert == nil { + if registryCert != nil || ncCert != nil { + return false, fmt.Errorf("incomplete deployment configuration: missing root CA certificate by which the provided certificates are signed") + } + return false, nil + } else if registryCert == nil || registryPrKey == nil || ncCert == nil || ncPrKey == nil { + return false, fmt.Errorf("incomplete deployment configuration: certificates and corresponding private keys must be provided") + } + + return true, nil +} + func GetDeploymentCRDSchema() *apiextensions.JSONSchemaProps { One := float64(1) Hundred := float64(100) diff --git a/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go index da070eacd8..b74a372963 100644 --- a/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pmemcsi/v1alpha1/zz_generated.deepcopy.go @@ -36,6 +36,22 @@ func (in *Deployment) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DeploymentCondition) DeepCopyInto(out *DeploymentCondition) { + *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentCondition. +func (in *DeploymentCondition) DeepCopy() *DeploymentCondition { + if in == nil { + return nil + } + out := new(DeploymentCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeploymentList) DeepCopyInto(out *DeploymentList) { *out = *in @@ -135,6 +151,20 @@ func (in *DeploymentSpec) DeepCopy() *DeploymentSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]DeploymentCondition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.Components != nil { + in, out := &in.Components, &out.Components + *out = make([]DriverStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } in.LastUpdated.DeepCopyInto(&out.LastUpdated) } @@ -147,3 +177,19 @@ func (in *DeploymentStatus) DeepCopy() *DeploymentStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DriverStatus) DeepCopyInto(out *DriverStatus) { + *out = *in + in.LastUpdated.DeepCopyInto(&out.LastUpdated) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DriverStatus. +func (in *DriverStatus) DeepCopy() *DriverStatus { + if in == nil { + return nil + } + out := new(DriverStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/pmem-csi-operator/controller/deployment/controller_driver.go b/pkg/pmem-csi-operator/controller/deployment/controller_driver.go index 89b0d4b7ab..58001a5bfc 100644 --- a/pkg/pmem-csi-operator/controller/deployment/controller_driver.go +++ b/pkg/pmem-csi-operator/controller/deployment/controller_driver.go @@ -67,194 +67,509 @@ var AllObjectTypes = []schema.GroupVersionKind{ type PmemCSIDriver struct { *api.Deployment // operators namespace used for creating sub-resources - namespace string - + namespace string k8sVersion version.Version } -// Reconcile reconciles the driver deployment -func (d *PmemCSIDriver) Reconcile(r *ReconcileDeployment) (bool, error) { - changes := map[api.DeploymentChange]struct{}{} +type ObjectPatch struct { + obj apiruntime.Object + patch client.Patch +} - oldDeployment, foundInCache := r.deployments[d.Name] - if !foundInCache { - /* New deployment */ - r.evRecorder.Event(d, corev1.EventTypeNormal, api.EventReasonNew, "Processing new driver deployment") +func NewObjectPatch(obj, copy apiruntime.Object) *ObjectPatch { + return &ObjectPatch{ + obj: obj, + patch: client.MergeFrom(copy), } +} - copy := d.DeepCopy() +// IsNew checks if the object is a new object, i.e, the object is not +// yet stored with APIServer +func (op ObjectPatch) IsNew() bool { + if op.obj != nil { + // We ignore only possible error - client.errNotObject + // and treat it's as new object + if o, err := meta.Accessor(op.obj); err == nil { + // An object registered with API serve will have + // a non-empty(zero) resource version + return o.GetResourceVersion() == "" + } + } + return true +} + +// Reconcile reconciles the driver deployment +func (d *PmemCSIDriver) Reconcile(r *ReconcileDeployment) error { if err := d.EnsureDefaults(r.containerImage); err != nil { d.Deployment.Status.Phase = api.DeploymentPhaseFailed r.evRecorder.Event(d, corev1.EventTypeWarning, api.EventReasonFailed, err.Error()) - return true, err - } - - if foundInCache { - // Cached deployment does not have defaults stored in it. - // So, reset those defaults before comparing otherwise - // those set defaults `d.Deployment` will be detected as - // changes. - cached := oldDeployment.DeepCopy() - if err := cached.EnsureDefaults(r.containerImage); err != nil { - r.evRecorder.Event(d, corev1.EventTypeWarning, api.EventReasonFailed, err.Error()) - return true, err + return err + } + + klog.Infof("Deployment: %q, state %q ", d.Name, d.Status.Phase) + var allObjects []apiruntime.Object + redeployAll := func() error { + var o apiruntime.Object + var err error + if s, err := d.redeploySecrets(r); err != nil { + return err + } else { + for _, o := range s { + allObjects = append(allObjects, o) + } } - changes = d.Compare(cached) - } - - klog.Infof("Deployment: %q, state %q, changes %v, in cache %v", d.Name, d.Status.Phase, changes, foundInCache) - - requeue, err := d.reconcileDeploymentChanges(r, changes, foundInCache) - // Some clients(fake client used in tests) do not support patching status reliably - // and updates even spec changes. So, revert any spec changes(like deployment defaults) we made. - // Those are not supposed to get saved on the API server. - d.Deployment.Spec = copy.Spec - if err == nil { - // If everything ok, update local cache - r.deployments[d.Name] = d.Deployment - - // TODO: wait for functional driver before entering "running" phase. - // For now we go straight to it. - d.Status.Phase = api.DeploymentPhaseRunning - r.evRecorder.Event(d, corev1.EventTypeNormal, api.EventReasonRunning, "Driver deployment successful") - } else if d.Status.Phase == api.DeploymentPhaseNew { - d.Status.Phase = api.DeploymentPhaseFailed - r.evRecorder.Event(d, corev1.EventTypeWarning, api.EventReasonFailed, err.Error()) + if o, err = d.redeployProvisionerRole(r); err != nil { + return fmt.Errorf("failed to update RBAC role: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployProvisionerRoleBinding(r); err != nil { + return fmt.Errorf("failed to update RBAC role bindings: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to update RBAC cluster role: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployProvisionerClusterRoleBinding(r); err != nil { + return fmt.Errorf("failed to update RBAC cluster role bindings: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployServiceAccount(r); err != nil { + return fmt.Errorf("failed to update service account: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployControllerService(r); err != nil { + return fmt.Errorf("failed to update controller service: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployMetricsService(r); err != nil { + return fmt.Errorf("failed to update controller service: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployCSIDriver(r); err != nil { + return fmt.Errorf("failed to update CSI driver: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployControllerDriver(r); err != nil { + return fmt.Errorf("failed to update controller driver: %v", err) + } + allObjects = append(allObjects, o) + if o, err = d.redeployNodeDriver(r); err != nil { + return fmt.Errorf("failed to update node driver: %v", err) + } + allObjects = append(allObjects, o) + return nil + } + + if err := redeployAll(); err != nil { + d.SetCondition(api.DriverDeployed, corev1.ConditionFalse, err.Error()) + return err + } + + d.SetCondition(api.DriverDeployed, corev1.ConditionTrue, "Driver deployed successfully.") + + klog.Infof("Deployed '%d' objects.", len(allObjects)) + // FIXME(avalluri): Limit the obsolete object deletion either to only version upgrades + // or on operator restart. + if err := d.deleteObsoleteObjects(r, allObjects); err != nil { + return fmt.Errorf("Delete obsolete objects failed with error: %v", err) } - return requeue, err + return nil } -// reconcileDeploymentChanges examines the changes and updates the appropriate objects. -// In case of foundInCache is false, all the objects gets refreshed/updated. -func (d *PmemCSIDriver) reconcileDeploymentChanges(r *ReconcileDeployment, changes map[api.DeploymentChange]struct{}, foundInCache bool) (bool, error) { - updateController := false - updateNodeDriver := false - updateSecrets := false - updateAll := false // Update all objects of the deployment +// getSubObject retrieves the latest revision of given object type from the API server +// And checks if that object is owned by the current deployment CR +func (d *PmemCSIDriver) getSubObject(r *ReconcileDeployment, obj apiruntime.Object) error { + objMeta, err := meta.Accessor(obj) + if err != nil { + return fmt.Errorf("internal error %T: %v", obj, err) + } - if !foundInCache { - // Running deployment not found in cache, possibly result of operator - // restart, refresh all objects. - updateAll = true + klog.V(4).Infof("Getting object %q of type %T", objMeta.GetName(), obj) + if err := r.Get(obj); err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + ownerRef := d.GetOwnerReference() + if !isOwnedBy(objMeta, &ownerRef) { + return fmt.Errorf("'%s' of type %T is not owned by '%s'", objMeta.GetName(), obj, ownerRef.Name) } - for c := range changes { - var err error - switch c { - case api.ControllerResources, api.ProvisionerImage: - updateController = true - case api.NodeResources, api.NodeRegistrarImage: - updateNodeDriver = true - case api.DriverImage, api.LogLevel, api.PullPolicy: - updateController = true - updateNodeDriver = true - case api.DriverMode: - updateNodeDriver = true - case api.NodeSelector: - updateNodeDriver = true - case api.PMEMPercentage: - updateNodeDriver = true - case api.Labels: - updateAll = true - case api.CACertificate, api.RegistryCertificate, api.NodeControllerCertificate: - updateSecrets = true - case api.KubeletDir: - updateNodeDriver = true + return nil +} + +// updateSubObject writes the object changes to the API server. +func (d *PmemCSIDriver) updateSubObject(r *ReconcileDeployment, op *ObjectPatch) error { + objMeta, err := meta.Accessor(op.obj) + if err != nil { + return fmt.Errorf("internal error %T: %v", op.obj, err) + } + + // Set labels just before creating/patching. + // NOTE: Labels cannot be set before creating the client.Patch + // otherwise they get missed from the patch diff. + objMeta.SetLabels(d.Spec.Labels) + if op.IsNew() { + // For unknown reason client.Create() clearing off the + // GVK on obj, So restore it manually. + gvk := op.obj.GetObjectKind().GroupVersionKind() + err := r.client.Create(context.TODO(), op.obj) + op.obj.GetObjectKind().SetGroupVersionKind(gvk) + return err + } + klog.V(4).Infof("Updating object %q of type %T", objMeta.GetName(), op.obj) + return r.client.Patch(context.TODO(), op.obj, op.patch) +} + +// redeploySecrets ensures that the secrets get (re)deployed that are +// required for running the driver. +// +// First it checks if the deployment is configured with the needed certificates. +// If provided, validate and (re)create secrets with using them. +// Else, provision new certificates only if no existing secrets and deploy. +func (d *PmemCSIDriver) redeploySecrets(r *ReconcileDeployment) ([]*corev1.Secret, error) { + rs := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.registrySecretName(), false), + } + if err := d.getSubObject(r, rs); err != nil { + return nil, err + } + rop := NewObjectPatch(rs, rs.DeepCopy()) + + ns := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{Kind: "Secret", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.nodeSecretName(), false), + } + if err := d.getSubObject(r, ns); err != nil { + return nil, err + } + nop := NewObjectPatch(ns, ns.DeepCopy()) + + update := func() error { + d.getRegistrySecrets(rs) + if err := d.updateSubObject(r, rop); err != nil { + return fmt.Errorf("Failed to update registry secrets: %v", err) } - if err != nil { - klog.Warningf("Incompatible change of deployment occurred: %v", err) + d.getNodeSecrets(ns) + if err := d.updateSubObject(r, nop); err != nil { + return fmt.Errorf("Failed to update node secrets: %v", err) } + return nil } - objects := []apiruntime.Object{} + certsProvided, err := d.HaveCertificatesConfigured() + if err != nil { + d.SetCondition(api.CertsVerified, corev1.ConditionFalse, err.Error()) + return nil, err + } - // Force update all deployment objects - if updateAll { - klog.Infof("Updating all objects for deployment %q", d.Name) - objs, err := d.getDeploymentObjects() - if err != nil { - return true, err - } - objects = append(objects, objs...) - - // If not found cache might be result of operator restart - // check if this deployment has any objects to be deleted - if !foundInCache { - if err := d.deleteObsoleteObjects(r, objs); err != nil { - klog.Infof("Failed to delete obsolete objects: %v", err) - return true, err - } + if certsProvided { + // Use provided certificates + if err := d.validateCertificates(); err != nil { + d.SetCondition(api.CertsVerified, corev1.ConditionFalse, err.Error()) + return nil, err } - } else { - if updateSecrets { - objs, err := d.getSecrets() - if err != nil { - return true, err - } - objects = append(objects, objs...) + d.SetCondition(api.CertsVerified, corev1.ConditionTrue, "Driver certificates validated.") + + // update secrets if required + if err := update(); err != nil { + d.SetCondition(api.CertsReady, corev1.ConditionFalse, err.Error()) + return nil, err } - if updateController { - klog.Infof("Updating controller driver for deployment %q", d.Name) - objects = append(objects, d.getControllerStatefulSet()) + } else if rop.IsNew() || nop.IsNew() { + // Provision new self-signed certificates if not already present + if err := d.provisionCertificates(); err != nil { + d.SetCondition(api.CertsReady, corev1.ConditionFalse, err.Error()) + return nil, err } - if updateNodeDriver { - klog.Infof("Updating node driver for deployment %q", d.Name) - objects = append(objects, d.getNodeDaemonSet()) + + if err := update(); err != nil { + d.SetCondition(api.CertsReady, corev1.ConditionFalse, err.Error()) + return nil, err } } - for _, obj := range objects { - // Services needs special treatment as they have some immutable field(s) - // So, we cannot refresh the existing one with new service object. - if s, ok := obj.(*corev1.Service); ok { - existingService := &corev1.Service{ - TypeMeta: s.TypeMeta, - ObjectMeta: s.ObjectMeta, - } - err := r.Get(existingService) - if err != nil { - if !errors.IsNotFound(err) { - return true, err - } - // Create the missing service now - klog.Infof("'%s' service not found, creating new one", s.GetName()) - if err := r.Create(s); err != nil { - return true, err - } - continue - } + d.SetCondition(api.CertsReady, corev1.ConditionTrue, "Driver certificates are available.") + return []*corev1.Secret{rs, ns}, nil +} - existingService.Spec.Ports = []corev1.ServicePort{} - for _, p := range s.Spec.Ports { - existingService.Spec.Ports = append(existingService.Spec.Ports, p) - } - existingService.Spec.Selector = s.Spec.Selector - if len(s.Labels) > 0 { - if existingService.Labels == nil { - existingService.Labels = map[string]string{} - } - for key, value := range s.Labels { - existingService.Labels[key] = value - } - } - klog.Infof("updating service '%s' service ports and selector", s.GetName()) - if err := r.Update(existingService); err != nil { - return true, err - } - } else { - if err := r.UpdateOrCreate(obj); err != nil { - return true, err - } +// redeployNodeDriver deploys the node daemon set and records its status on to CR status. +func (d *PmemCSIDriver) redeployNodeDriver(r *ReconcileDeployment) (*appsv1.DaemonSet, error) { + ds := &appsv1.DaemonSet{ + TypeMeta: metav1.TypeMeta{Kind: "DaemonSet", APIVersion: "apps/v1"}, + ObjectMeta: d.getObjectMeta(d.nodeDriverName(), false), + } + if err := d.getSubObject(r, ds); err != nil { + return nil, err + } + op := NewObjectPatch(ds, ds.DeepCopy()) + d.getNodeDaemonSet(ds) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + + // Update node driver status is status object + status := "NotReady" + reason := "Unknown" + if ds.Status.NumberAvailable == 0 { + reason = "Node daemon set has not started yet." + } else if ds.Status.NumberReady == ds.Status.NumberAvailable { + status = "Ready" + reason = fmt.Sprintf("All %d node driver pod(s) running successfully", ds.Status.NumberAvailable) + } else { + reason = fmt.Sprintf("%d out of %d driver pods are ready", ds.Status.NumberReady, ds.Status.NumberAvailable) + } + d.SetDriverStatus(api.NodeDriver, status, reason) + + return ds, nil +} + +// redeployControllerDriver deploys the controller stateful set and records its status on to CR status. +func (d *PmemCSIDriver) redeployControllerDriver(r *ReconcileDeployment) (*appsv1.StatefulSet, error) { + ss := &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{Kind: "StatefulSet", APIVersion: "apps/v1"}, + ObjectMeta: d.getObjectMeta(d.controllerDriverName(), false), + } + if err := d.getSubObject(r, ss); err != nil { + return nil, err + } + op := NewObjectPatch(ss, ss.DeepCopy()) + d.getControllerStatefulSet(ss) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + + // Update controller status is status object + status := "NotReady" + reason := "Unknown" + if ss.Status.Replicas == 0 { + reason = "Controller stateful set has not started yet." + } else if ss.Status.ReadyReplicas == ss.Status.Replicas { + status = "Ready" + reason = fmt.Sprintf("%d instance(s) of controller driver is running successfully", ss.Status.ReadyReplicas) + } else { + reason = fmt.Sprintf("Waiting for stateful set to be ready: %d of %d replicas are ready", + ss.Status.ReadyReplicas, ss.Status.Replicas) + } + d.SetDriverStatus(api.ControllerDriver, status, reason) + + return ss, nil +} + +func (d *PmemCSIDriver) redeployControllerService(r *ReconcileDeployment) (*corev1.Service, error) { + s := &corev1.Service{ + TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.controllerServiceName(), false), + } + + if err := d.getSubObject(r, s); err != nil { + return nil, err + } + + op := NewObjectPatch(s, s.DeepCopy()) + d.getControllerService(s) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return s, nil + +} + +func (d *PmemCSIDriver) redeployMetricsService(r *ReconcileDeployment) (*corev1.Service, error) { + o := &corev1.Service{ + TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.metricsServiceName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getMetricsService(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployCSIDriver(r *ReconcileDeployment) (*storagev1beta1.CSIDriver, error) { + o := &storagev1beta1.CSIDriver{ + TypeMeta: metav1.TypeMeta{Kind: "CSIDriver", APIVersion: "storage.k8s.io/v1beta1"}, + ObjectMeta: d.getObjectMeta(d.csiDriverName(), true), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getCSIDriver(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployProvisionerRole(r *ReconcileDeployment) (*rbacv1.Role, error) { + o := &rbacv1.Role{ + TypeMeta: metav1.TypeMeta{Kind: "Role", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerRoleName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerRole(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployProvisionerRoleBinding(r *ReconcileDeployment) (*rbacv1.RoleBinding, error) { + o := &rbacv1.RoleBinding{ + TypeMeta: metav1.TypeMeta{Kind: "RoleBinding", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerRoleBindingName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerRoleBinding(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployProvisionerClusterRole(r *ReconcileDeployment) (*rbacv1.ClusterRole, error) { + o := &rbacv1.ClusterRole{ + TypeMeta: metav1.TypeMeta{Kind: "ClusterRole", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerClusterRoleName(), true), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerClusterRole(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil + +} + +func (d *PmemCSIDriver) redeployProvisionerClusterRoleBinding(r *ReconcileDeployment) (*rbacv1.ClusterRoleBinding, error) { + o := &rbacv1.ClusterRoleBinding{ + TypeMeta: metav1.TypeMeta{Kind: "ClusterRoleBinding", APIVersion: "rbac.authorization.k8s.io/v1"}, + ObjectMeta: d.getObjectMeta(d.provisionerClusterRoleBindingName(), true), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + d.getControllerProvisionerClusterRoleBinding(o) + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +func (d *PmemCSIDriver) redeployServiceAccount(r *ReconcileDeployment) (*corev1.ServiceAccount, error) { + o := &corev1.ServiceAccount{ + TypeMeta: metav1.TypeMeta{Kind: "ServiceAccount", APIVersion: "v1"}, + ObjectMeta: d.getObjectMeta(d.serviceAccountName(), false), + } + if err := d.getSubObject(r, o); err != nil { + return nil, err + } + op := NewObjectPatch(o, o.DeepCopy()) + /* nothing to customize for service account */ + if err := d.updateSubObject(r, op); err != nil { + return nil, err + } + return o, nil +} + +// HandleEvent handles the delete/update events received on sub-objects +func (d *PmemCSIDriver) HandleEvent(meta metav1.Object, obj apiruntime.Object, r *ReconcileDeployment) error { + switch obj.(type) { + case *corev1.Secret: + klog.V(4).Infof("Redeploying driver secrets") + if _, err := d.redeploySecrets(r); err != nil { + return fmt.Errorf("failed to redeploy %q secrets: %v", meta.GetName(), err) + } + case *appsv1.DaemonSet: + klog.V(4).Infof("Redeploying node driver") + org := d.DeepCopy() + if _, err := d.redeployNodeDriver(r); err != nil { + return fmt.Errorf("failed to redeploy node driver: %v", err) + } + if err := r.PatchDeploymentStatus(d.Deployment, client.MergeFrom(org)); err != nil { + return fmt.Errorf("failed to update deployment CR status: %v", err) } + case *appsv1.StatefulSet: + klog.V(4).Infof("Redeploying controller driver") + org := d.DeepCopy() + if _, err := d.redeployControllerDriver(r); err != nil { + return fmt.Errorf("failed to redeploy controller driver: %v", err) + } + if err := r.PatchDeploymentStatus(d.Deployment, client.MergeFrom(org)); err != nil { + return fmt.Errorf("failed to update deployment CR status: %v", err) + } + case *rbacv1.Role: + klog.V(4).Infof("Redeploying provisioner RBAC role: %q", meta.GetName()) + if _, err := d.redeployProvisionerRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q provisioner role: %v", meta.GetName(), err) + } + case *rbacv1.ClusterRole: + klog.V(4).Infof("Redeploying provisioner cluster role: %q", meta.GetName()) + if _, err := d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q cluster role: %v", meta.GetName(), err) + } + case *rbacv1.RoleBinding: + klog.V(4).Infof("Redeploying provisioner role binding") + if _, err := d.redeployProvisionerRoleBinding(r); err != nil { + return fmt.Errorf("failed to redeploy %q role binding: %v", meta.GetName(), err) + } + case *rbacv1.ClusterRoleBinding: + klog.V(4).Infof("Redeploying provisioner cluster role binding: %q", meta.GetName()) + if _, err := d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q cluster role binding: %v", meta.GetName(), err) + } + case *corev1.ServiceAccount: + klog.V(4).Infof("Redeploying service account") + if _, err := d.redeployProvisionerClusterRole(r); err != nil { + return fmt.Errorf("failed to redeploy %q service account: %v", meta.GetName(), err) + } + case *corev1.Service: + var err error + klog.V(4).Infof("Redeploying service: %s", meta.GetName()) + if meta.GetName() == d.controllerServiceName() { + _, err = d.redeployControllerService(r) + } else if meta.GetName() == d.metricsServiceName() { + _, err = d.redeployMetricsService(r) + } + if err != nil { + return fmt.Errorf("failed to redeploy %q service: %v", meta.GetName(), err) + } + case *storagev1beta1.CSIDriver: + klog.V(4).Infof("Redeploying redeploy %q CSIDriver", meta.GetName()) + if _, err := d.redeployCSIDriver(r); err != nil { + return fmt.Errorf("failed to redeploy %q CSIDriver: %v", meta.GetName(), err) + } + default: + klog.V(3).Infof("Ignoring event on '%s' of type %T", meta.GetName(), obj) } - return false, nil + return nil } func objectIsObsolete(objList []apiruntime.Object, toFind unstructured.Unstructured) (bool, error) { + klog.V(5).Infof("Checking if %q of type %q is obsolete...", toFind.GetName(), toFind.GetObjectKind().GroupVersionKind()) for i := range objList { metaObj, err := meta.Accessor(objList[i]) if err != nil { @@ -280,6 +595,11 @@ func (d *PmemCSIDriver) isOwnerOf(obj unstructured.Unstructured) bool { } func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects []apiruntime.Object) error { + for _, obj := range newObjects { + metaObj, _ := meta.Accessor(obj) + klog.V(5).Infof("==>%q type %q", metaObj.GetName(), obj.GetObjectKind().GroupVersionKind()) + } + for _, gvk := range AllObjectTypes { list := &unstructured.UnstructuredList{} list.SetGroupVersionKind(gvk) @@ -287,7 +607,7 @@ func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects Namespace: d.namespace, } - klog.Infof("Fetching '%s' list with options: %v", gvk, opts.Namespace) + klog.V(5).Infof("Fetching '%s' list with options: %v", gvk, opts.Namespace) if err := r.client.List(context.TODO(), list, opts); err != nil { return err } @@ -303,6 +623,8 @@ func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects if !obsolete { continue } + klog.V(3).Infof("Deleting %q of type '%s' is obsolete object", obj.GetName(), obj.GetObjectKind().GroupVersionKind()) + o, err := scheme.Scheme.New(obj.GetObjectKind().GroupVersionKind()) if err != nil { return err @@ -322,101 +644,109 @@ func (d *PmemCSIDriver) deleteObsoleteObjects(r *ReconcileDeployment, newObjects return nil } -// getDeploymentObjects returns all objects that are part of a driver deployment. -func (d *PmemCSIDriver) getDeploymentObjects() ([]apiruntime.Object, error) { - objects, err := d.getSecrets() - if err != nil { - return nil, err - } +func (d *PmemCSIDriver) registrySecretName() string { + return d.GetHyphenedName() + "-registry-secrets" +} - objects = append(objects, - d.getCSIDriver(), - d.getControllerServiceAccount(), - d.getControllerProvisionerRole(), - d.getControllerProvisionerRoleBinding(), - d.getControllerProvisionerClusterRole(), - d.getControllerProvisionerClusterRoleBinding(), - d.getControllerService(), - d.getMetricsService(), - d.getControllerStatefulSet(), - d.getNodeDaemonSet(), - ) - - return objects, nil +func (d *PmemCSIDriver) nodeSecretName() string { + return d.GetHyphenedName() + "-node-secrets" } -func (d *PmemCSIDriver) getSecrets() ([]apiruntime.Object, error) { - // Encoded private keys and certificates - caCert := d.Spec.CACert - registryPrKey := d.Spec.RegistryPrivateKey - ncPrKey := d.Spec.NodeControllerPrivateKey - registryCert := d.Spec.RegistryCert - ncCert := d.Spec.NodeControllerCert - - // sanity check - if caCert == nil { - if registryCert != nil || ncCert != nil { - return nil, fmt.Errorf("incomplete deployment configuration: missing root CA certificate by which the provided certificates are signed") - } - } else if registryCert == nil || registryPrKey == nil || ncCert == nil || ncPrKey == nil { - return nil, fmt.Errorf("incomplete deployment configuration: certificates and corresponding private keys must be provided") - } +func (d *PmemCSIDriver) csiDriverName() string { + return d.GetName() +} - if caCert == nil { - var prKey *rsa.PrivateKey +func (d *PmemCSIDriver) controllerServiceName() string { + return d.GetHyphenedName() + "-controller" +} - ca, err := pmemtls.NewCA(nil, nil) - if err != nil { - return nil, fmt.Errorf("failed to initialize CA: %v", err) - } - caCert = ca.EncodedCertificate() +func (d *PmemCSIDriver) metricsServiceName() string { + return d.GetHyphenedName() + "-metrics" +} - if registryPrKey != nil { - prKey, err = pmemtls.DecodeKey(registryPrKey) - } else { - prKey, err = pmemtls.NewPrivateKey() - registryPrKey = pmemtls.EncodeKey(prKey) - } - if err != nil { - return nil, err - } +func (d *PmemCSIDriver) serviceAccountName() string { + return d.GetHyphenedName() + "-controller" +} - cert, err := ca.GenerateCertificate("pmem-registry", prKey.Public()) - if err != nil { - return nil, fmt.Errorf("failed to generate registry certificate: %v", err) - } - registryCert = pmemtls.EncodeCert(cert) +func (d *PmemCSIDriver) provisionerRoleName() string { + return d.GetHyphenedName() + "-external-provisioner-cfg" +} - if ncPrKey == nil { - prKey, err = pmemtls.NewPrivateKey() - ncPrKey = pmemtls.EncodeKey(prKey) - } else { - prKey, err = pmemtls.DecodeKey(ncPrKey) - } - if err != nil { - return nil, err - } +func (d *PmemCSIDriver) provisionerRoleBindingName() string { + return d.GetHyphenedName() + "-csi-provisioner-role-cfg" +} - cert, err = ca.GenerateCertificate("pmem-node-controller", prKey.Public()) - if err != nil { - return nil, err - } - ncCert = pmemtls.EncodeCert(cert) +func (d *PmemCSIDriver) provisionerClusterRoleName() string { + return d.GetHyphenedName() + "-external-provisioner-runner" +} + +func (d *PmemCSIDriver) provisionerClusterRoleBindingName() string { + return d.GetHyphenedName() + "-csi-provisioner-role" +} + +func (d *PmemCSIDriver) nodeDriverName() string { + return d.GetHyphenedName() + "-node" +} + +func (d *PmemCSIDriver) controllerDriverName() string { + return d.GetHyphenedName() + "-controller" +} + +func (d *PmemCSIDriver) getRegistrySecrets(secret *corev1.Secret) { + d.getSecret(secret, "registry-secrets", d.Spec.CACert, d.Spec.RegistryPrivateKey, d.Spec.RegistryCert) +} + +func (d *PmemCSIDriver) getNodeSecrets(secret *corev1.Secret) { + d.getSecret(secret, "node-secrets", d.Spec.CACert, d.Spec.NodeControllerPrivateKey, d.Spec.NodeControllerCert) +} + +func (d *PmemCSIDriver) provisionCertificates() error { + var prKey *rsa.PrivateKey + + klog.Infof("Provisioning new certificates for deployment '%s'", d.Name) + ca, err := pmemtls.NewCA(nil, nil) + if err != nil { + return fmt.Errorf("failed to initialize CA: %v", err) + } + d.Spec.CACert = ca.EncodedCertificate() + + if d.Spec.RegistryPrivateKey != nil { + prKey, err = pmemtls.DecodeKey(d.Spec.RegistryPrivateKey) } else { - // check if the provided certificates are valid - if err := validateCertificates(caCert, registryPrKey, registryCert, ncPrKey, ncCert); err != nil { - return nil, fmt.Errorf("validate CA certificates: %v", err) - } + prKey, err = pmemtls.NewPrivateKey() + d.Spec.RegistryPrivateKey = pmemtls.EncodeKey(prKey) + } + if err != nil { + return err + } + + cert, err := ca.GenerateCertificate("pmem-registry", prKey.Public()) + if err != nil { + return fmt.Errorf("failed to generate registry certificate: %v", err) } + d.Spec.RegistryCert = pmemtls.EncodeCert(cert) + + if d.Spec.NodeControllerPrivateKey == nil { + prKey, err = pmemtls.NewPrivateKey() + d.Spec.NodeControllerPrivateKey = pmemtls.EncodeKey(prKey) + } else { + prKey, err = pmemtls.DecodeKey(d.Spec.NodeControllerPrivateKey) + } + if err != nil { + return err + } + + cert, err = ca.GenerateCertificate("pmem-node-controller", prKey.Public()) + if err != nil { + return err + } + d.Spec.NodeControllerCert = pmemtls.EncodeCert(cert) // Instead of waiting for next GC cycle, initiate garbage collector manually // so that the unneeded CA key, certificate get removed. defer runtime.GC() - return []apiruntime.Object{ - d.getSecret("registry-secrets", caCert, registryPrKey, registryCert), - d.getSecret("node-secrets", caCert, ncPrKey, ncCert), - }, nil + return nil } // validateCertificates ensures that the given keys and certificates are valid @@ -424,7 +754,7 @@ func (d *PmemCSIDriver) getSecrets() ([]apiruntime.Object, error) { // a tls client connection to that sever using the provided keys and certificates. // As we use mutual-tls, testing one server is enough to make sure that the provided // certificates works -func validateCertificates(caCert, regKey, regCert, ncKey, ncCert []byte) error { +func (d *PmemCSIDriver) validateCertificates() error { tmp, err := ioutil.TempDir("", "pmem-csi-validate-certs-*") if err != nil { return err @@ -432,12 +762,12 @@ func validateCertificates(caCert, regKey, regCert, ncKey, ncCert []byte) error { defer os.RemoveAll(tmp) // Registry server config - regCfg, err := pmemgrpc.ServerTLS(caCert, regCert, regKey, "pmem-node-controller") + regCfg, err := pmemgrpc.ServerTLS(d.Spec.CACert, d.Spec.RegistryCert, d.Spec.RegistryPrivateKey, "pmem-node-controller") if err != nil { return err } - clientCfg, err := pmemgrpc.ClientTLS(caCert, ncCert, ncKey, "pmem-registry") + clientCfg, err := pmemgrpc.ClientTLS(d.Spec.CACert, d.Spec.NodeControllerCert, d.Spec.NodeControllerPrivateKey, "pmem-registry") if err != nil { return err } @@ -460,20 +790,13 @@ func validateCertificates(caCert, regKey, regCert, ncKey, ncCert []byte) error { return nil } -func (d *PmemCSIDriver) getCSIDriver() *storagev1beta1.CSIDriver { +func (d *PmemCSIDriver) getCSIDriver(csiDriver *storagev1beta1.CSIDriver) { attachRequired := false podInfoOnMount := true - csiDriver := &storagev1beta1.CSIDriver{ - TypeMeta: metav1.TypeMeta{ - Kind: "CSIDriver", - APIVersion: "storage.k8s.io/v1beta1", - }, - ObjectMeta: d.getObjectMeta(d.GetName(), true), - Spec: storagev1beta1.CSIDriverSpec{ - AttachRequired: &attachRequired, - PodInfoOnMount: &podInfoOnMount, - }, + csiDriver.Spec = storagev1beta1.CSIDriverSpec{ + AttachRequired: &attachRequired, + PodInfoOnMount: &podInfoOnMount, } // Volume lifecycle modes are supported only after k8s v1.16 @@ -483,419 +806,331 @@ func (d *PmemCSIDriver) getCSIDriver() *storagev1beta1.CSIDriver { storagev1beta1.VolumeLifecycleEphemeral, } } - - return csiDriver } -func (d *PmemCSIDriver) getSecret(cn string, ca, encodedKey, encodedCert []byte) *corev1.Secret { - return &corev1.Secret{ - TypeMeta: metav1.TypeMeta{ - Kind: "Secret", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-"+cn, false), - Type: corev1.SecretTypeTLS, - Data: map[string][]byte{ - // Same names as in the example secrets and in the v1 API. - "ca.crt": ca, // no standard name for this one - "tls.key": encodedKey, // v1.TLSPrivateKeyKey - "tls.crt": encodedCert, // v1.TLSCertKey - }, +func (d *PmemCSIDriver) getSecret(secret *corev1.Secret, cn string, ca, encodedKey, encodedCert []byte) { + secret.Type = corev1.SecretTypeTLS + secret.Data = map[string][]byte{ + // Same names as in the example secrets and in the v1 API. + "ca.crt": ca, // no standard name for this one + "tls.key": encodedKey, // v1.TLSPrivateKeyKey + "tls.crt": encodedCert, // v1.TLSCertKey } } -func (d *PmemCSIDriver) getControllerService() *corev1.Service { - return &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-controller", false), - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeClusterIP, - Ports: []corev1.ServicePort{ - corev1.ServicePort{ - Port: controllerServicePort, - TargetPort: intstr.IntOrString{ - IntVal: controllerServicePort, - }, - }, - }, - Selector: map[string]string{ - "app": d.GetHyphenedName() + "-controller", - }, - }, +func (d *PmemCSIDriver) getService(service *corev1.Service, t corev1.ServiceType, port int32) { + service.Spec.Type = t + if service.Spec.Ports == nil { + service.Spec.Ports = append(service.Spec.Ports, corev1.ServicePort{}) + } + service.Spec.Ports[0].Port = port + service.Spec.Ports[0].TargetPort = intstr.IntOrString{ + IntVal: port, + } + service.Spec.Selector = map[string]string{ + "app": d.GetHyphenedName() + "-controller", } } -func (d *PmemCSIDriver) getMetricsService() *corev1.Service { - return &corev1.Service{ - TypeMeta: metav1.TypeMeta{ - Kind: "Service", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-metrics", false), - Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeNodePort, - Ports: []corev1.ServicePort{ - corev1.ServicePort{ - Port: controllerMetricsPort, - TargetPort: intstr.IntOrString{ - IntVal: controllerMetricsPort, - }, - }, - }, - Selector: map[string]string{ - "app": d.GetHyphenedName() + "-controller", - }, - }, - } +func (d *PmemCSIDriver) getControllerService(service *corev1.Service) { + d.getService(service, corev1.ServiceTypeClusterIP, controllerServicePort) } -func (d *PmemCSIDriver) getControllerServiceAccount() *corev1.ServiceAccount { - return &corev1.ServiceAccount{ - TypeMeta: metav1.TypeMeta{ - Kind: "ServiceAccount", - APIVersion: "v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-controller", false), - } +func (d *PmemCSIDriver) getMetricsService(service *corev1.Service) { + d.getService(service, corev1.ServiceTypeNodePort, controllerMetricsPort) } -func (d *PmemCSIDriver) getControllerProvisionerRole() *rbacv1.Role { - return &rbacv1.Role{ - TypeMeta: metav1.TypeMeta{ - Kind: "Role", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-external-provisioner-cfg", false), - Rules: []rbacv1.PolicyRule{ - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"endpoints"}, - Verbs: []string{ - "get", "watch", "list", "delete", "update", "create", - }, +func (d *PmemCSIDriver) getControllerProvisionerRole(role *rbacv1.Role) { + role.Rules = []rbacv1.PolicyRule{ + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"endpoints"}, + Verbs: []string{ + "get", "watch", "list", "delete", "update", "create", }, - rbacv1.PolicyRule{ - APIGroups: []string{"coordination.k8s.io"}, - Resources: []string{"leases"}, - Verbs: []string{ - "get", "watch", "list", "delete", "update", "create", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"coordination.k8s.io"}, + Resources: []string{"leases"}, + Verbs: []string{ + "get", "watch", "list", "delete", "update", "create", }, }, } } -func (d *PmemCSIDriver) getControllerProvisionerRoleBinding() *rbacv1.RoleBinding { - return &rbacv1.RoleBinding{ - TypeMeta: metav1.TypeMeta{ - Kind: "RoleBinding", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-csi-provisioner-role-cfg", false), - Subjects: []rbacv1.Subject{ - { - Kind: "ServiceAccount", - Name: d.GetHyphenedName() + "-controller", - Namespace: d.namespace, - }, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "Role", - Name: d.GetHyphenedName() + "-external-provisioner-cfg", +func (d *PmemCSIDriver) getControllerProvisionerRoleBinding(rb *rbacv1.RoleBinding) { + rb.Subjects = []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: d.GetHyphenedName() + "-controller", + Namespace: d.namespace, }, } + rb.RoleRef = rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: d.GetHyphenedName() + "-external-provisioner-cfg", + } } -func (d *PmemCSIDriver) getControllerProvisionerClusterRole() *rbacv1.ClusterRole { - return &rbacv1.ClusterRole{ - TypeMeta: metav1.TypeMeta{ - Kind: "ClusterRole", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-external-provisioner-runner", true), - Rules: []rbacv1.PolicyRule{ - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"persistentvolumes"}, - Verbs: []string{ - "get", "watch", "list", "delete", "create", - }, +func (d *PmemCSIDriver) getControllerProvisionerClusterRole(cr *rbacv1.ClusterRole) { + cr.Rules = []rbacv1.PolicyRule{ + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"persistentvolumes"}, + Verbs: []string{ + "get", "watch", "list", "delete", "create", }, - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"persistentvolumeclaims"}, - Verbs: []string{ - "get", "watch", "list", "update", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"persistentvolumeclaims"}, + Verbs: []string{ + "get", "watch", "list", "update", }, - rbacv1.PolicyRule{ - APIGroups: []string{"storage.k8s.io"}, - Resources: []string{"storageclasses"}, - Verbs: []string{ - "get", "watch", "list", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"storage.k8s.io"}, + Resources: []string{"storageclasses"}, + Verbs: []string{ + "get", "watch", "list", }, - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"events"}, - Verbs: []string{ - "watch", "list", "create", "update", "patch", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{ + "watch", "list", "create", "update", "patch", }, - rbacv1.PolicyRule{ - APIGroups: []string{"snapshot.storage.k8s.io"}, - Resources: []string{"volumesnapshots", "volumesnapshotcontents"}, - Verbs: []string{ - "get", "list", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"snapshot.storage.k8s.io"}, + Resources: []string{"volumesnapshots", "volumesnapshotcontents"}, + Verbs: []string{ + "get", "list", }, - rbacv1.PolicyRule{ - APIGroups: []string{"storage.k8s.io"}, - Resources: []string{"csinodes"}, - Verbs: []string{ - "get", "list", "watch", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{"storage.k8s.io"}, + Resources: []string{"csinodes"}, + Verbs: []string{ + "get", "list", "watch", }, - rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"nodes"}, - Verbs: []string{ - "get", "list", "watch", - }, + }, + rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"nodes"}, + Verbs: []string{ + "get", "list", "watch", }, }, } } -func (d *PmemCSIDriver) getControllerProvisionerClusterRoleBinding() *rbacv1.ClusterRoleBinding { - return &rbacv1.ClusterRoleBinding{ - TypeMeta: metav1.TypeMeta{ - Kind: "ClusterRoleBinding", - APIVersion: "rbac.authorization.k8s.io/v1", - }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-csi-provisioner-role", true), - Subjects: []rbacv1.Subject{ - { - Kind: "ServiceAccount", - Name: d.GetHyphenedName() + "-controller", - Namespace: d.namespace, - }, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "ClusterRole", - Name: d.GetHyphenedName() + "-external-provisioner-runner", +func (d *PmemCSIDriver) getControllerProvisionerClusterRoleBinding(crb *rbacv1.ClusterRoleBinding) { + crb.Subjects = []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: d.serviceAccountName(), // GetHyphenedName() + "-controller", + Namespace: d.namespace, }, } + crb.RoleRef = rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: d.provisionerClusterRoleName(), // d.GetHyphenedName() + "-external-provisioner-runner", + } } -func (d *PmemCSIDriver) getControllerStatefulSet() *appsv1.StatefulSet { +func (d *PmemCSIDriver) getControllerStatefulSet(ss *appsv1.StatefulSet) { replicas := int32(1) true := true pmemcsiUser := int64(1000) - ss := &appsv1.StatefulSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "StatefulSet", - APIVersion: "apps/v1", + + ss.Spec = appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.GetHyphenedName() + "-controller", + }, }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-controller", false), - Spec: appsv1.StatefulSetSpec{ - Replicas: &replicas, - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": d.GetHyphenedName() + "-controller", + ServiceName: d.GetHyphenedName() + "-controller", + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: joinMaps( + d.Spec.Labels, + map[string]string{ + "app": d.GetHyphenedName() + "-controller", + "pmem-csi.intel.com/webhook": "ignore", + }), + Annotations: map[string]string{ + "pmem-csi.intel.com/scrape": "containers", }, }, - ServiceName: d.GetHyphenedName() + "-controller", - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: joinMaps( - d.Spec.Labels, - map[string]string{ - "app": d.GetHyphenedName() + "-controller", - "pmem-csi.intel.com/webhook": "ignore", - }), - Annotations: map[string]string{ - "pmem-csi.intel.com/scrape": "containers", - }, + Spec: corev1.PodSpec{ + SecurityContext: &corev1.PodSecurityContext{ + // Controller pod must run as non-root user + RunAsNonRoot: &true, + RunAsUser: &pmemcsiUser, }, - Spec: corev1.PodSpec{ - SecurityContext: &corev1.PodSecurityContext{ - // Controller pod must run as non-root user - RunAsNonRoot: &true, - RunAsUser: &pmemcsiUser, - }, - ServiceAccountName: d.GetHyphenedName() + "-controller", - Containers: []corev1.Container{ - d.getControllerContainer(), - d.getProvisionerContainer(), - }, - Affinity: &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - // By default, the controller will run anywhere in the cluster. - // If that isn't desired, the "pmem-csi.intel.com/controller" label - // can be set to "no" or "false" for a node to prevent the controller - // from running there. - // - // This is used during testing as a workaround for a particular issue - // on Clear Linux where network configuration randomly fails such that - // the driver which runs on the same node as the controller cannot - // connect to the controller (https://github.com/intel/pmem-csi/issues/555). - // - // It may also be useful for other purposes, in particular for deployment - // through the operator: it has the same rule and currently no other API for - // setting affinity. - { - Key: "pmem-csi.intel.com/controller", - Operator: corev1.NodeSelectorOpNotIn, - Values: []string{"no", "false"}, - }, + ServiceAccountName: d.GetHyphenedName() + "-controller", + Containers: []corev1.Container{ + d.getControllerContainer(), + d.getProvisionerContainer(), + }, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + // By default, the controller will run anywhere in the cluster. + // If that isn't desired, the "pmem-csi.intel.com/controller" label + // can be set to "no" or "false" for a node to prevent the controller + // from running there. + // + // This is used during testing as a workaround for a particular issue + // on Clear Linux where network configuration randomly fails such that + // the driver which runs on the same node as the controller cannot + // connect to the controller (https://github.com/intel/pmem-csi/issues/555). + // + // It may also be useful for other purposes, in particular for deployment + // through the operator: it has the same rule and currently no other API for + // setting affinity. + { + Key: "pmem-csi.intel.com/controller", + Operator: corev1.NodeSelectorOpNotIn, + Values: []string{"no", "false"}, }, }, }, }, }, }, - Volumes: []corev1.Volume{ - { - Name: "plugin-socket-dir", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, + }, + Volumes: []corev1.Volume{ + { + Name: "plugin-socket-dir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, - { - Name: "registry-cert", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: d.GetHyphenedName() + "-registry-secrets", - }, + }, + { + Name: "registry-cert", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: d.GetHyphenedName() + "-registry-secrets", }, }, - { - Name: "tmp-dir", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, + }, + { + Name: "tmp-dir", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, }, }, }, } - - return ss } -func (d *PmemCSIDriver) getNodeDaemonSet() *appsv1.DaemonSet { +func (d *PmemCSIDriver) getNodeDaemonSet(ds *appsv1.DaemonSet) { directoryOrCreate := corev1.HostPathDirectoryOrCreate - ds := &appsv1.DaemonSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "DaemonSet", - APIVersion: "apps/v1", + ds.Spec = appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": d.GetHyphenedName() + "-node", + }, }, - ObjectMeta: d.getObjectMeta(d.GetHyphenedName()+"-node", false), - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": d.GetHyphenedName() + "-node", + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: joinMaps( + d.Spec.Labels, + map[string]string{ + "app": d.GetHyphenedName() + "-node", + "pmem-csi.intel.com/webhook": "ignore", + }), + Annotations: map[string]string{ + "pmem-csi.intel.com/scrape": "containers", }, }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: joinMaps( - d.Spec.Labels, - map[string]string{ - "app": d.GetHyphenedName() + "-node", - "pmem-csi.intel.com/webhook": "ignore", - }), - Annotations: map[string]string{ - "pmem-csi.intel.com/scrape": "containers", - }, + Spec: corev1.PodSpec{ + NodeSelector: d.Spec.NodeSelector, + Containers: []corev1.Container{ + d.getNodeDriverContainer(), + d.getNodeRegistrarContainer(), }, - Spec: corev1.PodSpec{ - NodeSelector: d.Spec.NodeSelector, - Containers: []corev1.Container{ - d.getNodeDriverContainer(), - d.getNodeRegistrarContainer(), - }, - Volumes: []corev1.Volume{ - { - Name: "socket-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/plugins/" + d.GetName(), - Type: &directoryOrCreate, - }, + Volumes: []corev1.Volume{ + { + Name: "socket-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/plugins/" + d.GetName(), + Type: &directoryOrCreate, }, }, - { - Name: "registration-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/plugins_registry/", - Type: &directoryOrCreate, - }, + }, + { + Name: "registration-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/plugins_registry/", + Type: &directoryOrCreate, }, }, - { - Name: "mountpoint-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/plugins/kubernetes.io/csi", - Type: &directoryOrCreate, - }, + }, + { + Name: "mountpoint-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/plugins/kubernetes.io/csi", + Type: &directoryOrCreate, }, }, - { - Name: "pods-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: d.Spec.KubeletDir + "/pods", - Type: &directoryOrCreate, - }, + }, + { + Name: "pods-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: d.Spec.KubeletDir + "/pods", + Type: &directoryOrCreate, }, }, - { - Name: "node-cert", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: d.GetHyphenedName() + "-node-secrets", - }, + }, + { + Name: "node-cert", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: d.GetHyphenedName() + "-node-secrets", }, }, - { - Name: "pmem-state-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/var/lib/" + d.GetName(), - Type: &directoryOrCreate, - }, + }, + { + Name: "pmem-state-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/var/lib/" + d.GetName(), + Type: &directoryOrCreate, }, }, - { - Name: "dev-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/dev", - Type: &directoryOrCreate, - }, + }, + { + Name: "dev-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/dev", + Type: &directoryOrCreate, }, }, - { - Name: "sys-dir", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/sys", - Type: &directoryOrCreate, - }, + }, + { + Name: "sys-dir", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/sys", + Type: &directoryOrCreate, }, }, }, @@ -903,8 +1138,6 @@ func (d *PmemCSIDriver) getNodeDaemonSet() *appsv1.DaemonSet { }, }, } - - return ds } func (d *PmemCSIDriver) getControllerCommand() []string { @@ -1143,6 +1376,7 @@ func (d *PmemCSIDriver) getMetricsPorts(port int32) []corev1.ContainerPort { { Name: "metrics", ContainerPort: port, + Protocol: "TCP", }, } } @@ -1153,7 +1387,6 @@ func (d *PmemCSIDriver) getObjectMeta(name string, isClusterResource bool) metav OwnerReferences: []metav1.OwnerReference{ d.GetOwnerReference(), }, - Labels: d.Spec.Labels, } if !isClusterResource { meta.Namespace = d.namespace diff --git a/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go b/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go index 879d4a8ed7..0e3cbd7cf0 100644 --- a/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go +++ b/pkg/pmem-csi-operator/controller/deployment/deployment_controller.go @@ -10,18 +10,22 @@ import ( "context" "fmt" "os" - "reflect" + "sync" "time" pmemcsiv1alpha1 "github.com/intel/pmem-csi/pkg/apis/pmemcsi/v1alpha1" "github.com/intel/pmem-csi/pkg/k8sutil" pmemcontroller "github.com/intel/pmem-csi/pkg/pmem-csi-operator/controller" "github.com/intel/pmem-csi/pkg/version" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + storagev1beta1 "k8s.io/api/storage/v1beta1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" v1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -29,8 +33,10 @@ import ( "k8s.io/kubectl/pkg/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -47,11 +53,11 @@ func Add(mgr manager.Manager, opts pmemcontroller.ControllerOptions) error { if err != nil { return err } - return add(mgr, r) + return add(mgr, r.(*ReconcileDeployment)) } // add adds a new Controller to mgr with r as the reconcile.Reconciler -func add(mgr manager.Manager, r reconcile.Reconciler) error { +func add(mgr manager.Manager, r *ReconcileDeployment) error { // Create a new controller c, err := controller.New("deployment-controller", mgr, controller.Options{Reconciler: r}) if err != nil { @@ -59,13 +65,97 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } + p := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + klog.V(4).Infof("Got deployment update event on: %s(Type: %T)", e.MetaOld.GetName(), e.MetaOld) + old := e.ObjectOld.(*pmemcsiv1alpha1.Deployment) + new := e.ObjectNew.(*pmemcsiv1alpha1.Deployment) + if old.GetGeneration() == new.GetGeneration() { + // No changes registered + return false + } + // We are intersted in only spec changes, not CR status changes + changes := old.Compare(new) + klog.Infof("CR changes: %v", changes) + return len(changes) != 0 + }, + DeleteFunc: func(e event.DeleteEvent) bool { + klog.Infof("Deployment '%s' deleted, removing local reference", e.Meta.GetName()) + // Deployment CR deleted, remove it's reference from cache. + // Objects owned by it are automatically garbage collected. + r.deleteDeployment(e.Meta.GetName()) + // We already handled the event here, + // so no more further reconcile required. + return false + }, + } + // Watch for changes to primary resource Deployment - err = c.Watch(&source.Kind{Type: &pmemcsiv1alpha1.Deployment{}}, &handler.EnqueueRequestForObject{}) - if err != nil { + if err := c.Watch(&source.Kind{Type: &pmemcsiv1alpha1.Deployment{}}, &handler.EnqueueRequestForObject{}, p); err != nil { klog.Errorf("Deployment.Add: watch error: %v", err) return err } + // Predicated functions for sub-object changes + // We do not want to pollute deployment reconcile loop by sending + // sub-object changes, instead we could handle them individually. + // So all these event handlers returns 'false' so that the event + // is not propagated further. + // One exception is: If we fail to handle here, then we pass this + // event to reconcile loop, where it should recognize these requests + // and just requeue. Expecting that the failure is retried. + eventFunc := func(event string, meta metav1.Object, obj apiruntime.Object) bool { + // Get the owned deployment + d, err := r.getDeploymentFor(meta) + if err != nil { + // The owner might have deleted already + // we can safely ignore this event + return false + } + if err := d.HandleEvent(meta, obj, r); err != nil { + klog.Warningf("Error occurred while handling %s event: %v", event, err) + return true + } + return false + } + sop := predicate.Funcs{ + DeleteFunc: func(e event.DeleteEvent) bool { + klog.V(4).Infof("'%s' of type %T deleted", e.Meta.GetName(), e.Object) + return eventFunc("delete", e.Meta, e.Object) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + klog.V(4).Infof("'%s' of type %T updated", e.MetaOld.GetName(), e.ObjectOld) + return eventFunc("update", e.MetaOld, e.ObjectOld) + }, + CreateFunc: func(e event.CreateEvent) bool { + return false + }, + } + + // All possible object types for a Deployment CR + // that would requires recdeploying on change + subresources := []runtime.Object{ + &corev1.Secret{}, + &rbacv1.Role{}, + &rbacv1.RoleBinding{}, + &rbacv1.ClusterRole{}, + &rbacv1.ClusterRoleBinding{}, + &corev1.Service{}, + &storagev1beta1.CSIDriver{}, + &appsv1.StatefulSet{}, + &appsv1.DaemonSet{}, + } + + for _, resource := range subresources { + if err := c.Watch(&source.Kind{Type: resource}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &pmemcsiv1alpha1.Deployment{}, + }, sop); err != nil { + klog.Errorf("Deployment.Add: watch error: %v", err) + return err + } + } + return nil } @@ -85,8 +175,9 @@ type ReconcileDeployment struct { // container image used for deploying the operator containerImage string // known deployments - deployments map[string]*pmemcsiv1alpha1.Deployment - reconcileHooks map[ReconcileHook]struct{} + deployments map[string]*pmemcsiv1alpha1.Deployment + deploymentsMutex sync.Mutex + reconcileHooks map[ReconcileHook]struct{} } // NewReconcileDeployment creates new deployment reconciler @@ -117,14 +208,15 @@ func NewReconcileDeployment(client client.Client, opts pmemcontroller.Controller evRecorder := evBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "pmem-csi-operator"}) return &ReconcileDeployment{ - client: client, - evBroadcaster: evBroadcaster, - evRecorder: evRecorder, - k8sVersion: opts.K8sVersion, - namespace: opts.Namespace, - containerImage: opts.DriverImage, - deployments: map[string]*pmemcsiv1alpha1.Deployment{}, - reconcileHooks: map[ReconcileHook]struct{}{}, + client: client, + evBroadcaster: evBroadcaster, + evRecorder: evRecorder, + k8sVersion: opts.K8sVersion, + namespace: opts.Namespace, + containerImage: opts.DriverImage, + deployments: map[string]*pmemcsiv1alpha1.Deployment{}, + deploymentsMutex: sync.Mutex{}, + reconcileHooks: map[ReconcileHook]struct{}{}, }, nil } @@ -137,30 +229,20 @@ func (r *ReconcileDeployment) Reconcile(request reconcile.Request) (reconcile.Re var requeue bool var err error - requeueDelay := 1 * time.Minute requeueDelayOnError := 2 * time.Minute // Fetch the Deployment instance deployment := &pmemcsiv1alpha1.Deployment{} err = r.client.Get(context.TODO(), request.NamespacedName, deployment) if err != nil { - if errors.IsNotFound(err) { - // Request object not found, could have been deleted after reconcile request. - // Owned objects are automatically garbage collected. - // Remove the reference from our records - klog.Infof("Deployment '%s' deleted, removing local reference", request.Name) - delete(r.deployments, request.Name) - return reconcile.Result{}, nil - } - // Error reading the object - requeue the request. + klog.V(3).Infof("Failed to retrieve object '%s' to reconcile", request.Name) + // One reason for this could be a failed predicate event handler of + // sub-objects. So requeue the request so that the same predicate + // handle could be called on that object. return reconcile.Result{Requeue: requeue, RequeueAfter: requeueDelayOnError}, err } - for f := range r.reconcileHooks { - if f != nil { - (*f)(deployment) - } - } + klog.Infof("Reconciling deployment %q", deployment.GetName()) // If the deployment has already been marked for deletion, // then we don't need to do anything for it because the @@ -170,37 +252,51 @@ func (r *ReconcileDeployment) Reconcile(request reconcile.Request) (reconcile.Re return reconcile.Result{Requeue: false}, nil } - patch := client.MergeFrom(deployment.DeepCopy()) - d := &PmemCSIDriver{deployment, r.namespace, r.k8sVersion} + for f := range r.reconcileHooks { + if f != nil { + (*f)(deployment) + } + } + + if deployment.Status.Phase == pmemcsiv1alpha1.DeploymentPhaseNew { + /* New deployment */ + r.evRecorder.Event(deployment, corev1.EventTypeNormal, pmemcsiv1alpha1.EventReasonNew, "Processing new driver deployment") + } + + // Cache the deployment + r.saveDeployment(deployment) + + dep := deployment.DeepCopy() // update status defer func() { - klog.Infof("Updating deployment status....") - d.Deployment.Status.LastUpdated = metav1.Now() - // Passing copy of CR to patch as the fake client used in tests - // will write back the changes to both status and spec. - copy := d.Deployment.DeepCopy() - if statusErr := r.client.Status().Patch(context.TODO(), copy, patch); statusErr != nil { + klog.Infof("Updating deployment status...") + // Some clients(fake client used in tests) do not support patching status reliably + // and updates even spec changes. So, revert any spec changes(like deployment defaults) we made. + // Revert back spec changes, those are not supposed to get saved on the API server. + deployment.Spec.DeepCopyInto(&dep.Spec) + + if err := r.PatchDeploymentStatus(dep, client.MergeFrom(deployment.DeepCopy())); err != nil { klog.Warningf("failed to update status %q for deployment %q: %v", - d.Deployment.Status.Phase, d.Name, statusErr) + dep.Status.Phase, dep.Name, err) } - d.Deployment.Status = copy.Status - }() - requeue, err = d.Reconcile(r) + klog.Infof("End of reconcile.") + }() - klog.Infof("Requeue: %t, error: %v", requeue, err) + d := &PmemCSIDriver{dep, r.namespace, r.k8sVersion} + if err := d.Reconcile(r); err != nil { + klog.Infof("Reconcile error: %v", err) + dep.Status.Phase = pmemcsiv1alpha1.DeploymentPhaseFailed + r.evRecorder.Event(dep, corev1.EventTypeWarning, pmemcsiv1alpha1.EventReasonFailed, err.Error()) - if !requeue { - return reconcile.Result{}, err + return reconcile.Result{Requeue: true, RequeueAfter: requeueDelayOnError}, err } - delay := requeueDelay - if err != nil { - delay = requeueDelayOnError - } + dep.Status.Phase = pmemcsiv1alpha1.DeploymentPhaseRunning + r.evRecorder.Event(dep, corev1.EventTypeNormal, pmemcsiv1alpha1.EventReasonRunning, "Driver deployment successful") - return reconcile.Result{Requeue: requeue, RequeueAfter: delay}, err + return reconcile.Result{}, nil } func (r *ReconcileDeployment) Namespace() string { @@ -303,6 +399,64 @@ func (r *ReconcileDeployment) Delete(obj runtime.Object) error { return r.client.Delete(context.TODO(), obj) } +// PatchDeploymentStatus patches the give given deployment CR status +func (r *ReconcileDeployment) PatchDeploymentStatus(dep *pmemcsiv1alpha1.Deployment, patch client.Patch) error { + dep.Status.LastUpdated = metav1.Now() + // Passing a copy of CR to patch as the fake client used in tests + // will write back the changes to both status and spec. + if err := r.client.Status().Patch(context.TODO(), dep.DeepCopy(), patch); err != nil { + return err + } + // update the status of cached deployment + r.cacheDeploymentStatus(dep.GetName(), dep.Status) + return nil +} + +func (r *ReconcileDeployment) saveDeployment(d *pmemcsiv1alpha1.Deployment) { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + r.deployments[d.Name] = d +} + +func (r *ReconcileDeployment) getDeployment(name string) *pmemcsiv1alpha1.Deployment { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + return r.deployments[name] +} + +func (r *ReconcileDeployment) deleteDeployment(name string) { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + delete(r.deployments, name) +} + +func (r *ReconcileDeployment) cacheDeploymentStatus(name string, status pmemcsiv1alpha1.DeploymentStatus) { + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + if d, ok := r.deployments[name]; ok { + status.DeepCopyInto(&d.Status) + } +} + +func (r *ReconcileDeployment) getDeploymentFor(obj metav1.Object) (*PmemCSIDriver, error) { + //klog.Infof("MapRequest: %s(%T)", obj.Meta.GetName(), obj.Meta) + r.deploymentsMutex.Lock() + defer r.deploymentsMutex.Unlock() + for name, d := range r.deployments { + selfRef := d.GetOwnerReference() + if isOwnedBy(obj, &selfRef) { + klog.Infof("Object '%s' is owned by the %q deployment", obj.GetName(), name) + deployment := d.DeepCopy() + if err := deployment.EnsureDefaults(r.containerImage); err != nil { + return nil, err + } + return &PmemCSIDriver{deployment, r.namespace, r.k8sVersion}, nil + } + } + + return nil, fmt.Errorf("Not found") +} + // containerImage returns container image name used by operator Pod func containerImage(cs *kubernetes.Clientset, namespace string) (string, error) { const podNameEnv = "POD_NAME" @@ -339,7 +493,7 @@ func containerImage(cs *kubernetes.Clientset, namespace string) (string, error) // isOwnedBy checks if expectedOwner is in the object's owner references list func isOwnedBy(object metav1.Object, expectedOwner *metav1.OwnerReference) bool { for _, owner := range object.GetOwnerReferences() { - if reflect.DeepEqual(&owner, expectedOwner) { + if owner.UID == expectedOwner.UID { return true } }