Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
*: PVC for etcd datadir
Browse files Browse the repository at this point in the history
  • Loading branch information
hongchaodeng committed Nov 24, 2017
1 parent 684cfe7 commit 87d6b55
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 95 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/etcd/v1beta2/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type PodPolicy struct {
EtcdEnv []v1.EnvVar `json:"etcdEnv,omitempty"`

// PersistentVolumeClaimSpec ...
PersistentVolumeClaimSpec v1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"`
PersistentVolumeClaimSpec *v1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"`

// By default, kubernetes will mount a service account token into the etcd pods.
// AutomountServiceAccountToken indicates whether pods running with the service account should have an API token automatically mounted.
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,15 @@ func (in *PodPolicy) DeepCopyInto(out *PodPolicy) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.PersistentVolumeClaimSpec != nil {
in, out := &in.PersistentVolumeClaimSpec, &out.PersistentVolumeClaimSpec
if *in == nil {
*out = nil
} else {
*out = new(v1.PersistentVolumeClaimSpec)
(*in).DeepCopyInto(*out)
}
}
if in.AutomountServiceAccountToken != nil {
in, out := &in.AutomountServiceAccountToken, &out.AutomountServiceAccountToken
if *in == nil {
Expand Down
46 changes: 19 additions & 27 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,6 @@ func (c *Cluster) run() {
continue
}

pvcs, err := c.pollPVCs()
if err != nil {
c.logger.Errorf("failed to poll pvcs: %v", err)
reconcileFailed.WithLabelValues("failed to poll vcs").Inc()
continue
}

if len(pending) > 0 {
// Pod startup might take long, e.g. pulling image. It would deterministically become running or succeeded/failed later.
c.logger.Infof("skip reconciliation: running (%v), pending (%v)", k8sutil.GetPodNames(running), k8sutil.GetPodNames(pending))
Expand All @@ -282,7 +275,7 @@ func (c *Cluster) run() {
break
}
}
rerr = c.reconcile(running, pvcs)
rerr = c.reconcile(running)
if rerr != nil {
c.logger.Errorf("failed to reconcile: %v", rerr)
break
Expand Down Expand Up @@ -362,10 +355,6 @@ func (c *Cluster) isSecureClient() bool {
return c.cluster.Spec.TLS.IsSecureClient()
}

func (c *Cluster) IsPodPVEnabled() bool {
return c.cluster.Spec.Pod != nil && c.cluster.Spec.Pod.PV != nil
}

// bootstrap creates the seed etcd member for a new cluster.
func (c *Cluster) bootstrap() error {
return c.startSeedMember()
Expand All @@ -387,9 +376,26 @@ func (c *Cluster) setupServices() error {
return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
}

func (c *Cluster) IsPodPVEnabled() bool {
if podPolicy := c.cluster.Spec.Pod; podPolicy != nil {
return podPolicy.PersistentVolumeClaimSpec != nil
}
return false
}

func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string) error {
pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, uuid.New(), c.cluster.Spec, c.cluster.AsOwner())
_, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod)
if c.IsPodPVEnabled() {
pvc := k8sutil.NewEtcdPodPVC(m, *c.cluster.Spec.Pod.PersistentVolumeClaimSpec, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
_, err := c.config.KubeCli.CoreV1().PersistentVolumeClaims(c.cluster.Namespace).Create(pvc)
if err != nil {
return fmt.Errorf("failed to create PVC for member (%s): %v", m.Name, err)
}
k8sutil.AddEtcdVolumeToPod(pod, pvc)
} else {
k8sutil.AddEtcdVolumeToPod(pod, nil)
}
_, err := c.config.KubeCli.CoreV1().Pods(c.cluster.Namespace).Create(pod)
return err
}

Expand All @@ -412,20 +418,6 @@ func (c *Cluster) removePod(name string) error {
return nil
}

func (c *Cluster) removePVC(name string) error {
ns := c.cluster.Namespace
err := c.config.KubeCli.Core().PersistentVolumeClaims(ns).Delete(name, nil)
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
if c.isDebugLoggerEnabled() {
c.debugLogger.LogMessage(fmt.Sprintf("pvc (%s) not found while trying to delete it", name))
}
}
return nil
}

func (c *Cluster) pollPods() (running, pending []*v1.Pod, err error) {
podList, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).List(k8sutil.ClusterListOpt(c.cluster.Name))
if err != nil {
Expand Down
49 changes: 21 additions & 28 deletions pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var ErrLostQuorum = errors.New("lost quorum")
// reconcile reconciles cluster current state to desired state specified by spec.
// - it tries to reconcile the cluster to desired size.
// - if the cluster needs for upgrade, it tries to upgrade old member one by one.
func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) error {
func (c *Cluster) reconcile(pods []*v1.Pod) error {
c.logger.Infoln("Start reconciling")
defer c.logger.Infoln("Finish reconciling")

Expand All @@ -50,10 +50,6 @@ func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) er
}
c.status.ClearCondition(api.ClusterConditionScaling)

if err := c.reconcilePVCs(pvcs); err != nil {
return err
}

if needUpgrade(pods, sp) {
c.status.UpgradeVersionTo(sp.Version)

Expand Down Expand Up @@ -106,26 +102,6 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error {
return c.removeDeadMember(c.members.Diff(L).PickOne())
}

// reconcilePVCs reconciles PVCs with current cluster members removing old PVCs
func (c *Cluster) reconcilePVCs(pvcs []*v1.PersistentVolumeClaim) error {
oldPVCs := []string{}
for _, pvc := range pvcs {
memberName := etcdutil.MemberNameFromPVCName(pvc.Name)
if _, ok := c.members[memberName]; !ok {
oldPVCs = append(oldPVCs, pvc.Name)
}
}

for _, oldPVC := range oldPVCs {
c.logger.Infof("removing old pvc: %s", oldPVC)
if err := c.removePVC(oldPVC); err != nil {
return err
}
}

return nil
}

func (c *Cluster) resize() error {
if c.members.Size() == c.cluster.Spec.Size {
return nil
Expand Down Expand Up @@ -217,14 +193,19 @@ func (c *Cluster) removeDeadMember(toRemove *etcdutil.Member) error {
return c.removeMember(toRemove)
}

func (c *Cluster) removeMember(toRemove *etcdutil.Member) error {
err := etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID)
func (c *Cluster) removeMember(toRemove *etcdutil.Member) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("remove member (%s) failed: %v", toRemove.Name, err)
}
}()

err = etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID)
if err != nil {
switch err {
case rpctypes.ErrMemberNotFound:
c.logger.Infof("etcd member (%v) has been removed", toRemove.Name)
default:
c.logger.Errorf("fail to remove etcd member (%v): %v", toRemove.Name, err)
return err
}
}
Expand All @@ -236,10 +217,22 @@ func (c *Cluster) removeMember(toRemove *etcdutil.Member) error {
if err := c.removePod(toRemove.Name); err != nil {
return err
}
err = c.removePVC(k8sutil.PVCNameFromMemberName(toRemove.Name))
if err != nil {
return err
}
c.logger.Infof("removed member (%v) with ID (%d)", toRemove.Name, toRemove.ID)
return nil
}

func (c *Cluster) removePVC(name string) error {
err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).Delete(name, nil)
if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) {
return fmt.Errorf("remove pvc (%s) failed: %v", name, err)
}
return nil
}

func needUpgrade(pods []*v1.Pod, cs api.ClusterSpec) bool {
return len(pods) == cs.Size && pickOneOldMember(pods, cs.Version) != nil
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/restore-operator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ func (r *Restore) createSeedMember(cs api.ClusterSpec, svcAddr, clusterName stri
ms := etcdutil.NewMemberSet(m)
backupURL := backupapi.BackupURLForRestore("http", svcAddr, clusterName)
cs.Cleanup()
isPodPVEnabled := cs.Pod != nil && cs.Pod.PV != nil
pod := k8sutil.NewSeedMemberPod(clusterName, ms, m, cs, owner, backupURL)
k8sutil.AddEtcdVolumeToPod(pod, m, isPodPVEnabled)
_, err := r.kubecli.Core().Pods(r.namespace).Create(pod)
return err
}
8 changes: 0 additions & 8 deletions pkg/util/etcdutil/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,3 @@ func clusterNameFromMemberName(mn string) string {
}
return mn[:i]
}

func MemberNameFromPVCName(pn string) string {
i := strings.LastIndex(pn, "-")
if i == -1 {
panic(fmt.Sprintf("unexpected pvc name: %s", pn))
}
return pn[:i]
}
42 changes: 14 additions & 28 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
appsv1beta1 "k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -79,8 +78,8 @@ func GetPodNames(pods []*v1.Pod) []string {
return res
}

func etcdPVCName(m *etcdutil.Member) string {
return fmt.Sprintf("%s-pvc", m.Name)
func PVCNameFromMemberName(memberName string) string {
return memberName
}

func makeRestoreInitContainers(backupURL *url.URL, token, baseImage, version string, m *etcdutil.Member) []v1.Container {
Expand Down Expand Up @@ -214,12 +213,16 @@ func newEtcdServiceManifest(svcName, clusterName, clusterIP string, ports []v1.S
return svc
}

func AddEtcdVolumeToPod(pod *v1.Pod, m *etcdutil.Member, usePVC bool) {
if usePVC {
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: etcdPVCName(m)}}})
func AddEtcdVolumeToPod(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) {
vol := v1.Volume{Name: etcdVolumeName}
if pvc != nil {
vol.VolumeSource = v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: pvc.Name},
}
} else {
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}})
vol.VolumeSource = v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}
}
pod.Spec.Volumes = append(pod.Spec.Volumes, vol)
}

func addRecoveryToPod(pod *v1.Pod, token string, m *etcdutil.Member, cs api.ClusterSpec, backupURL *url.URL) {
Expand All @@ -241,33 +244,16 @@ func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Mem
return pod
}

func NewPVC(m *etcdutil.Member, cs api.ClusterSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim {
name := etcdPVCName(m)
func NewEtcdPodPVC(m *etcdutil.Member, pvcSpec v1.PersistentVolumeClaimSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: PVCNameFromMemberName(m.Name),
Namespace: namespace,
Labels: map[string]string{
"etcd_node": m.Name,
"etcd_cluster": clusterName,
"app": "etcd",
},
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: &cs.Pod.PV.StorageClass,
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(fmt.Sprintf("%dMi", cs.Pod.PV.VolumeSizeInMB)),
},
},
Labels: LabelsForCluster(clusterName),
},
Spec: pvcSpec,
}

addOwnerRefToObject(pvc.GetObjectMeta(), owner)

return pvc
}

Expand Down
13 changes: 12 additions & 1 deletion test/e2e/etcd_on_pv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"os"
"testing"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/coreos/etcd-operator/test/e2e/e2eutil"
"github.com/coreos/etcd-operator/test/e2e/framework"
)
Expand All @@ -28,7 +32,14 @@ func TestCreateClusterWithPV(t *testing.T) {
}
f := framework.Global
c := e2eutil.NewCluster("test-etcd-", 3)
e2eutil.AddPV(c, f.StorageClassName)
c.Spec.Pod = &api.PodPolicy{
PersistentVolumeClaimSpec: &v1.PersistentVolumeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("512Mi")},
},
StorageClassName: func(s string) *string { return &s }("standard"),
},
}

testEtcd, err := e2eutil.CreateCluster(t, f.CRClient, f.Namespace, c)
if err != nil {
Expand Down

0 comments on commit 87d6b55

Please sign in to comment.