Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Commit

Permalink
*: implement PersistentVolume for etcd data design part 1
Browse files Browse the repository at this point in the history
this patch implements part 1 of the etcd data on persistent volumes design.

when pod pvsource is defined in the spec it'll create a PVC for every etcd
member and use it as the volume for etcd data.

pvc without a member will be removed during the reconcile.
  • Loading branch information
sgotti authored and hongchaodeng committed Nov 23, 2017
1 parent 62d20d4 commit 684cfe7
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 5 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/etcd/v1beta2/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type PodPolicy struct {
// This field cannot be updated.
EtcdEnv []v1.EnvVar `json:"etcdEnv,omitempty"`

// PersistentVolumeClaimSpec ...
PersistentVolumeClaimSpec v1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"`

// By default, kubernetes will mount a service account token into the etcd pods.
// AutomountServiceAccountToken indicates whether pods running with the service account should have an API token automatically mounted.
AutomountServiceAccountToken *bool `json:"automountServiceAccountToken,omitempty"`
Expand Down
28 changes: 27 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ func (c *Cluster) run() {
continue
}

pvcs, err := c.pollPVCs()
if err != nil {
c.logger.Errorf("failed to poll pvcs: %v", err)
reconcileFailed.WithLabelValues("failed to poll vcs").Inc()
continue
}

if len(pending) > 0 {
// Pod startup might take long, e.g. pulling image. It would deterministically become running or succeeded/failed later.
c.logger.Infof("skip reconciliation: running (%v), pending (%v)", k8sutil.GetPodNames(running), k8sutil.GetPodNames(pending))
Expand All @@ -275,7 +282,7 @@ func (c *Cluster) run() {
break
}
}
rerr = c.reconcile(running)
rerr = c.reconcile(running, pvcs)
if rerr != nil {
c.logger.Errorf("failed to reconcile: %v", rerr)
break
Expand Down Expand Up @@ -355,6 +362,10 @@ func (c *Cluster) isSecureClient() bool {
return c.cluster.Spec.TLS.IsSecureClient()
}

func (c *Cluster) IsPodPVEnabled() bool {
return c.cluster.Spec.Pod != nil && c.cluster.Spec.Pod.PV != nil
}

// bootstrap creates the seed etcd member for a new cluster.
func (c *Cluster) bootstrap() error {
return c.startSeedMember()
Expand Down Expand Up @@ -397,6 +408,21 @@ func (c *Cluster) removePod(name string) error {
if c.isDebugLoggerEnabled() {
c.debugLogger.LogPodDeletion(name)
}

return nil
}

func (c *Cluster) removePVC(name string) error {
ns := c.cluster.Namespace
err := c.config.KubeCli.Core().PersistentVolumeClaims(ns).Delete(name, nil)
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
if c.isDebugLoggerEnabled() {
c.debugLogger.LogMessage(fmt.Sprintf("pvc (%s) not found while trying to delete it", name))
}
}
return nil
}

Expand Down
26 changes: 25 additions & 1 deletion pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var ErrLostQuorum = errors.New("lost quorum")
// reconcile reconciles cluster current state to desired state specified by spec.
// - it tries to reconcile the cluster to desired size.
// - if the cluster needs for upgrade, it tries to upgrade old member one by one.
func (c *Cluster) reconcile(pods []*v1.Pod) error {
func (c *Cluster) reconcile(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) error {
c.logger.Infoln("Start reconciling")
defer c.logger.Infoln("Finish reconciling")

Expand All @@ -50,6 +50,10 @@ func (c *Cluster) reconcile(pods []*v1.Pod) error {
}
c.status.ClearCondition(api.ClusterConditionScaling)

if err := c.reconcilePVCs(pvcs); err != nil {
return err
}

if needUpgrade(pods, sp) {
c.status.UpgradeVersionTo(sp.Version)

Expand Down Expand Up @@ -102,6 +106,26 @@ func (c *Cluster) reconcileMembers(running etcdutil.MemberSet) error {
return c.removeDeadMember(c.members.Diff(L).PickOne())
}

// reconcilePVCs reconciles PVCs with current cluster members removing old PVCs
func (c *Cluster) reconcilePVCs(pvcs []*v1.PersistentVolumeClaim) error {
oldPVCs := []string{}
for _, pvc := range pvcs {
memberName := etcdutil.MemberNameFromPVCName(pvc.Name)
if _, ok := c.members[memberName]; !ok {
oldPVCs = append(oldPVCs, pvc.Name)
}
}

for _, oldPVC := range oldPVCs {
c.logger.Infof("removing old pvc: %s", oldPVC)
if err := c.removePVC(oldPVC); err != nil {
return err
}
}

return nil
}

func (c *Cluster) resize() error {
if c.members.Size() == c.cluster.Spec.Size {
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/restore-operator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (r *Restore) createSeedMember(cs api.ClusterSpec, svcAddr, clusterName stri
ms := etcdutil.NewMemberSet(m)
backupURL := backupapi.BackupURLForRestore("http", svcAddr, clusterName)
cs.Cleanup()
isPodPVEnabled := cs.Pod != nil && cs.Pod.PV != nil
pod := k8sutil.NewSeedMemberPod(clusterName, ms, m, cs, owner, backupURL)
k8sutil.AddEtcdVolumeToPod(pod, m, isPodPVEnabled)
_, err := r.kubecli.Core().Pods(r.namespace).Create(pod)
return err
}
8 changes: 8 additions & 0 deletions pkg/util/etcdutil/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,11 @@ func clusterNameFromMemberName(mn string) string {
}
return mn[:i]
}

func MemberNameFromPVCName(pn string) string {
i := strings.LastIndex(pn, "-")
if i == -1 {
panic(fmt.Sprintf("unexpected pvc name: %s", pn))
}
return pn[:i]
}
47 changes: 44 additions & 3 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
appsv1beta1 "k8s.io/api/apps/v1beta1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -78,6 +79,10 @@ func GetPodNames(pods []*v1.Pod) []string {
return res
}

func etcdPVCName(m *etcdutil.Member) string {
return fmt.Sprintf("%s-pvc", m.Name)
}

func makeRestoreInitContainers(backupURL *url.URL, token, baseImage, version string, m *etcdutil.Member) []v1.Container {
return []v1.Container{
{
Expand Down Expand Up @@ -209,6 +214,14 @@ func newEtcdServiceManifest(svcName, clusterName, clusterIP string, ports []v1.S
return svc
}

func AddEtcdVolumeToPod(pod *v1.Pod, m *etcdutil.Member, usePVC bool) {
if usePVC {
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: etcdPVCName(m)}}})
} else {
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: etcdVolumeName, VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}})
}
}

func addRecoveryToPod(pod *v1.Pod, token string, m *etcdutil.Member, cs api.ClusterSpec, backupURL *url.URL) {
pod.Spec.InitContainers = makeRestoreInitContainers(backupURL, token, cs.BaseImage, cs.Version, m)
}
Expand All @@ -228,6 +241,36 @@ func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Mem
return pod
}

func NewPVC(m *etcdutil.Member, cs api.ClusterSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim {
name := etcdPVCName(m)
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"etcd_node": m.Name,
"etcd_cluster": clusterName,
"app": "etcd",
},
},
Spec: v1.PersistentVolumeClaimSpec{
StorageClassName: &cs.Pod.PV.StorageClass,
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceStorage: resource.MustParse(fmt.Sprintf("%dMi", cs.Pod.PV.VolumeSizeInMB)),
},
},
},
}

addOwnerRefToObject(pvc.GetObjectMeta(), owner)

return pvc
}

func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, token string, cs api.ClusterSpec, owner metav1.OwnerReference) *v1.Pod {
commands := fmt.Sprintf("/usr/local/bin/etcd --data-dir=%s --name=%s --initial-advertise-peer-urls=%s "+
"--listen-peer-urls=%s --listen-client-urls=%s --advertise-client-urls=%s "+
Expand Down Expand Up @@ -257,9 +300,7 @@ func NewEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state,
container = containerWithRequirements(container, cs.Pod.Resources)
}

volumes := []v1.Volume{
{Name: "etcd-data", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
}
volumes := []v1.Volume{}

if m.SecurePeer {
container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{
Expand Down
47 changes: 47 additions & 0 deletions test/e2e/etcd_on_pv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2017 The etcd-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"os"
"testing"

"github.com/coreos/etcd-operator/test/e2e/e2eutil"
"github.com/coreos/etcd-operator/test/e2e/framework"
)

func TestCreateClusterWithPV(t *testing.T) {
if os.Getenv(envParallelTest) == envParallelTestTrue {
t.Parallel()
}
f := framework.Global
c := e2eutil.NewCluster("test-etcd-", 3)
e2eutil.AddPV(c, f.StorageClassName)

testEtcd, err := e2eutil.CreateCluster(t, f.CRClient, f.Namespace, c)
if err != nil {
t.Fatal(err)
}

defer func() {
if err := e2eutil.DeleteCluster(t, f.CRClient, f.KubeClient, testEtcd); err != nil {
t.Fatal(err)
}
}()

if _, err := e2eutil.WaitUntilSizeReached(t, f.CRClient, 3, 30, testEtcd); err != nil {
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
}
}

0 comments on commit 684cfe7

Please sign in to comment.