From 20e7ae0bddeab5261711fed02587936821cfbf48 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Thu, 23 Nov 2017 22:59:51 -0800 Subject: [PATCH] *: PVC for etcd datadir --- pkg/apis/etcd/v1beta2/cluster.go | 2 +- .../etcd/v1beta2/zz_generated.deepcopy.go | 9 ++++ pkg/cluster/cluster.go | 46 +++++++---------- pkg/cluster/reconcile.go | 49 ++++++++----------- pkg/controller/restore-operator/sync.go | 2 - pkg/util/etcdutil/member.go | 8 --- pkg/util/k8sutil/k8sutil.go | 42 ++++++---------- test/e2e/{etcd_on_pv_test.go => pv_test.go} | 14 +++++- 8 files changed, 77 insertions(+), 95 deletions(-) rename test/e2e/{etcd_on_pv_test.go => pv_test.go} (72%) diff --git a/pkg/apis/etcd/v1beta2/cluster.go b/pkg/apis/etcd/v1beta2/cluster.go index e178736e7..b1b36396c 100644 --- a/pkg/apis/etcd/v1beta2/cluster.go +++ b/pkg/apis/etcd/v1beta2/cluster.go @@ -137,7 +137,7 @@ type PodPolicy struct { EtcdEnv []v1.EnvVar `json:"etcdEnv,omitempty"` // PersistentVolumeClaimSpec ... - PersistentVolumeClaimSpec v1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"` + PersistentVolumeClaimSpec *v1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"` // By default, kubernetes will mount a service account token into the etcd pods. // AutomountServiceAccountToken indicates whether pods running with the service account should have an API token automatically mounted. diff --git a/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go b/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go index 6d098f8d4..7373436ab 100644 --- a/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go +++ b/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go @@ -529,6 +529,15 @@ func (in *PodPolicy) DeepCopyInto(out *PodPolicy) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.PersistentVolumeClaimSpec != nil { + in, out := &in.PersistentVolumeClaimSpec, &out.PersistentVolumeClaimSpec + if *in == nil { + *out = nil + } else { + *out = new(v1.PersistentVolumeClaimSpec) + (*in).DeepCopyInto(*out) + } + } if in.AutomountServiceAccountToken != nil { in, out := &in.AutomountServiceAccountToken, &out.AutomountServiceAccountToken if *in == nil { diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d2f0deb13..6dbcaf3d1 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -255,13 +255,6 @@ func (c *Cluster) run() { continue } - pvcs, err := c.pollPVCs() - if err != nil { - c.logger.Errorf("failed to poll pvcs: %v", err) - reconcileFailed.WithLabelValues("failed to poll vcs").Inc() - continue - } - if len(pending) > 0 { // Pod startup might take long, e.g. pulling image. It would deterministically become running or succeeded/failed later. c.logger.Infof("skip reconciliation: running (%v), pending (%v)", k8sutil.GetPodNames(running), k8sutil.GetPodNames(pending)) @@ -282,7 +275,7 @@ func (c *Cluster) run() { break } } - rerr = c.reconcile(running, pvcs) + rerr = c.reconcile(running) if rerr != nil { c.logger.Errorf("failed to reconcile: %v", rerr) break @@ -362,10 +355,6 @@ func (c *Cluster) isSecureClient() bool { return c.cluster.Spec.TLS.IsSecureClient() } -func (c *Cluster) IsPodPVEnabled() bool { - return c.cluster.Spec.Pod != nil && c.cluster.Spec.Pod.PV != nil -} - // bootstrap creates the seed etcd member for a new cluster. func (c *Cluster) bootstrap() error { return c.startSeedMember() @@ -387,9 +376,26 @@ func (c *Cluster) setupServices() error { return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner()) } +func (c *Cluster) IsPodPVEnabled() bool { + if podPolicy := c.cluster.Spec.Pod; podPolicy != nil { + return podPolicy.PersistentVolumeClaimSpec != nil + } + return false +} + func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string) error { pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, uuid.New(), c.cluster.Spec, c.cluster.AsOwner()) - _, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod) + if c.IsPodPVEnabled() { + pvc := k8sutil.NewEtcdPodPVC(m, *c.cluster.Spec.Pod.PersistentVolumeClaimSpec, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner()) + _, err := c.config.KubeCli.CoreV1().PersistentVolumeClaims(c.cluster.Namespace).Create(pvc) + if err != nil { + return fmt.Errorf("failed to create PVC for member (%s): %v", m.Name, err) + } + k8sutil.AddEtcdVolumeToPod(pod, pvc) + } else { + k8sutil.AddEtcdVolumeToPod(pod, nil) + } + _, err := c.config.KubeCli.CoreV1().Pods(c.cluster.Namespace).Create(pod) return err } @@ -412,20 +418,6 @@ func (c *Cluster) removePod(name string) error { return nil } -func (c *Cluster) removePVC(name string) error { - ns := c.cluster.Namespace - err := c.config.KubeCli.Core().PersistentVolumeClaims(ns).Delete(name, nil) - if err != nil { - if !k8sutil.IsKubernetesResourceNotFoundError(err) { - return err - } - if c.isDebugLoggerEnabled() { - c.debugLogger.LogMessage(fmt.Sprintf("pvc (%s) not found while trying to delete it", name)) - } - } - return nil -} - func (c *Cluster) pollPods() (running, pending []*v1.Pod, err error) { podList, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).List(k8sutil.ClusterListOpt(c.cluster.Name)) if err != nil { diff --git a/pkg/cluster/reconcile.go b/pkg/cluster/reconcile.go index 1147e3966..7e0589878 100644 --- a/pkg/cluster/reconcile.go +++ b/pkg/cluster/reconcile.go @@ -35,7 +35,7 @@ var ErrLostQuorum = errors.New("lost quorum") // reconcile reconciles cluster current state to desired state specified by spec. // - it tries to reconcile the cluster to desired size. // - if the cluster needs for upgrade, it tries to upgrade old member one by one. -func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) error { +func (c *Cluster) reconcile(pods []*v1.Pod) error { c.logger.Infoln("Start reconciling") defer c.logger.Infoln("Finish reconciling") @@ -50,10 +50,6 @@ func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) er } c.status.ClearCondition(api.ClusterConditionScaling) - if err := c.reconcilePVCs(pvcs); err != nil { - return err - } - if needUpgrade(pods, sp) { c.status.UpgradeVersionTo(sp.Version) @@ -106,26 +102,6 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error { return c.removeDeadMember(c.members.Diff(L).PickOne()) } -// reconcilePVCs reconciles PVCs with current cluster members removing old PVCs -func (c *Cluster) reconcilePVCs(pvcs []*v1.PersistentVolumeClaim) error { - oldPVCs := []string{} - for _, pvc := range pvcs { - memberName := etcdutil.MemberNameFromPVCName(pvc.Name) - if _, ok := c.members[memberName]; !ok { - oldPVCs = append(oldPVCs, pvc.Name) - } - } - - for _, oldPVC := range oldPVCs { - c.logger.Infof("removing old pvc: %s", oldPVC) - if err := c.removePVC(oldPVC); err != nil { - return err - } - } - - return nil -} - func (c *Cluster) resize() error { if c.members.Size() == c.cluster.Spec.Size { return nil @@ -217,14 +193,19 @@ func (c *Cluster) removeDeadMember(toRemove *etcdutil.Member) error { return c.removeMember(toRemove) } -func (c *Cluster) removeMember(toRemove *etcdutil.Member) error { - err := etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID) +func (c *Cluster) removeMember(toRemove *etcdutil.Member) (err error) { + defer func() { + if err != nil { + err = fmt.Errorf("remove member (%s) failed: %v", toRemove.Name, err) + } + }() + + err = etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID) if err != nil { switch err { case rpctypes.ErrMemberNotFound: c.logger.Infof("etcd member (%v) has been removed", toRemove.Name) default: - c.logger.Errorf("fail to remove etcd member (%v): %v", toRemove.Name, err) return err } } @@ -236,10 +217,22 @@ func (c *Cluster) removeMember(toRemove *etcdutil.Member) error { if err := c.removePod(toRemove.Name); err != nil { return err } + err = c.removePVC(k8sutil.PVCNameFromMemberName(toRemove.Name)) + if err != nil { + return err + } c.logger.Infof("removed member (%v) with ID (%d)", toRemove.Name, toRemove.ID) return nil } +func (c *Cluster) removePVC(name string) error { + err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).Delete(name, nil) + if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) { + return fmt.Errorf("remove pvc (%s) failed: %v", name, err) + } + return nil +} + func needUpgrade(pods []*v1.Pod, cs api.ClusterSpec) bool { return len(pods) == cs.Size && pickOneOldMember(pods, cs.Version) != nil } diff --git a/pkg/controller/restore-operator/sync.go b/pkg/controller/restore-operator/sync.go index 68dd5b27d..d10f31ffa 100644 --- a/pkg/controller/restore-operator/sync.go +++ b/pkg/controller/restore-operator/sync.go @@ -186,9 +186,7 @@ func (r *Restore) createSeedMember(cs api.ClusterSpec, svcAddr, clusterName stri ms := etcdutil.NewMemberSet(m) backupURL := backupapi.BackupURLForRestore("http", svcAddr, clusterName) cs.Cleanup() - isPodPVEnabled := cs.Pod != nil && cs.Pod.PV != nil pod := k8sutil.NewSeedMemberPod(clusterName, ms, m, cs, owner, backupURL) - k8sutil.AddEtcdVolumeToPod(pod, m, isPodPVEnabled) _, err := r.kubecli.Core().Pods(r.namespace).Create(pod) return err } diff --git a/pkg/util/etcdutil/member.go b/pkg/util/etcdutil/member.go index 83dbeff1a..ffc80c256 100644 --- a/pkg/util/etcdutil/member.go +++ b/pkg/util/etcdutil/member.go @@ -188,11 +188,3 @@ func clusterNameFromMemberName(mn string) string { } return mn[:i] } - -func MemberNameFromPVCName(pn string) string { - i := strings.LastIndex(pn, "-") - if i == -1 { - panic(fmt.Sprintf("unexpected pvc name: %s", pn)) - } - return pn[:i] -} diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 8ebaf1cd7..b66cc4721 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -31,7 +31,6 @@ import ( appsv1beta1 "k8s.io/api/apps/v1beta1" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -79,8 +78,8 @@ func GetPodNames(pods []*v1.Pod) []string { return res } -func etcdPVCName(m *etcdutil.Member) string { - return fmt.Sprintf("%s-pvc", m.Name) +func PVCNameFromMemberName(memberName string) string { + return memberName } func makeRestoreInitContainers(backupURL *url.URL, token, baseImage, version string, m *etcdutil.Member) []v1.Container { @@ -214,12 +213,16 @@ func newEtcdServiceManifest(svcName, clusterName, clusterIP string, ports []v1.S return svc } -func AddEtcdVolumeToPod(pod *v1.Pod, m *etcdutil.Member, usePVC bool) { - if usePVC { - pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: etcdPVCName(m)}}}) +func AddEtcdVolumeToPod(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) { + vol := v1.Volume{Name: etcdVolumeName} + if pvc != nil { + vol.VolumeSource = v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: pvc.Name}, + } } else { - pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}}) + vol.VolumeSource = v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}} } + pod.Spec.Volumes = append(pod.Spec.Volumes, vol) } func addRecoveryToPod(pod *v1.Pod, token string, m *etcdutil.Member, cs api.ClusterSpec, backupURL *url.URL) { @@ -241,33 +244,16 @@ func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Mem return pod } -func NewPVC(m *etcdutil.Member, cs api.ClusterSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim { - name := etcdPVCName(m) +func NewEtcdPodPVC(m *etcdutil.Member, pvcSpec v1.PersistentVolumeClaimSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim { pvc := &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: PVCNameFromMemberName(m.Name), Namespace: namespace, - Labels: map[string]string{ - "etcd_node": m.Name, - "etcd_cluster": clusterName, - "app": "etcd", - }, - }, - Spec: v1.PersistentVolumeClaimSpec{ - StorageClassName: &cs.Pod.PV.StorageClass, - AccessModes: []v1.PersistentVolumeAccessMode{ - v1.ReadWriteOnce, - }, - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceStorage: resource.MustParse(fmt.Sprintf("%dMi", cs.Pod.PV.VolumeSizeInMB)), - }, - }, + Labels: LabelsForCluster(clusterName), }, + Spec: pvcSpec, } - addOwnerRefToObject(pvc.GetObjectMeta(), owner) - return pvc } diff --git a/test/e2e/etcd_on_pv_test.go b/test/e2e/pv_test.go similarity index 72% rename from test/e2e/etcd_on_pv_test.go rename to test/e2e/pv_test.go index 7d81d25fd..36b90d673 100644 --- a/test/e2e/etcd_on_pv_test.go +++ b/test/e2e/pv_test.go @@ -18,6 +18,10 @@ import ( "os" "testing" + api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "github.com/coreos/etcd-operator/test/e2e/e2eutil" "github.com/coreos/etcd-operator/test/e2e/framework" ) @@ -28,7 +32,15 @@ func TestCreateClusterWithPV(t *testing.T) { } f := framework.Global c := e2eutil.NewCluster("test-etcd-", 3) - e2eutil.AddPV(c, f.StorageClassName) + c.Spec.Pod = &api.PodPolicy{ + PersistentVolumeClaimSpec: &v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("512Mi")}, + }, + StorageClassName: func(s string) *string { return &s }("standard"), + }, + } testEtcd, err := e2eutil.CreateCluster(t, f.CRClient, f.Namespace, c) if err != nil {