Skip to content
This repository has been archived by the owner on Mar 28, 2020. It is now read-only.

Add PersistVolumne Support #1861

Merged
merged 9 commits into from
Jan 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/apis/etcd/v1beta2/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ type PodPolicy struct {
// bootstrap the cluster (for example `--initial-cluster` flag).
// This field cannot be updated.
EtcdEnv []v1.EnvVar `json:"etcdEnv,omitempty"`

// PersistentVolumeClaimSpec is the spec to describe PVC for the etcd container
// This field is optional. If no PVC spec, etcd container will use emptyDir as volume
// Note. This feature is in alpha stage. It is currently only used as non-stable storage,
// not the stable storage. Future work need to make it used as stable storage.
PersistentVolumeClaimSpec *v1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"`
}

func (c *ClusterSpec) Validate() error {
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,15 @@ func (in *PodPolicy) DeepCopyInto(out *PodPolicy) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.PersistentVolumeClaimSpec != nil {
in, out := &in.PersistentVolumeClaimSpec, &out.PersistentVolumeClaimSpec
if *in == nil {
*out = nil
} else {
*out = new(v1.PersistentVolumeClaimSpec)
(*in).DeepCopyInto(*out)
}
}
return
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,26 @@ func (c *Cluster) setupServices() error {
return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
}

func (c *Cluster) isPodPVEnabled() bool {
if podPolicy := c.cluster.Spec.Pod; podPolicy != nil {
return podPolicy.PersistentVolumeClaimSpec != nil
}
return false
}

func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string) error {
pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, uuid.New(), c.cluster.Spec, c.cluster.AsOwner())
_, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod)
if c.isPodPVEnabled() {
pvc := k8sutil.NewEtcdPodPVC(m, *c.cluster.Spec.Pod.PersistentVolumeClaimSpec, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner())
_, err := c.config.KubeCli.CoreV1().PersistentVolumeClaims(c.cluster.Namespace).Create(pvc)
if err != nil {
return fmt.Errorf("failed to create PVC for member (%s): %v", m.Name, err)
}
k8sutil.AddEtcdVolumeToPod(pod, pvc)
} else {
k8sutil.AddEtcdVolumeToPod(pod, nil)
}
_, err := c.config.KubeCli.CoreV1().Pods(c.cluster.Namespace).Create(pod)
return err
}

Expand Down
25 changes: 22 additions & 3 deletions pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,19 @@ func (c *Cluster) removeDeadMember(toRemove *etcdutil.Member) error {
return c.removeMember(toRemove)
}

func (c *Cluster) removeMember(toRemove *etcdutil.Member) error {
err := etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID)
func (c *Cluster) removeMember(toRemove *etcdutil.Member) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("remove member (%s) failed: %v", toRemove.Name, err)
}
}()

err = etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID)
if err != nil {
switch err {
case rpctypes.ErrMemberNotFound:
c.logger.Infof("etcd member (%v) has been removed", toRemove.Name)
default:
c.logger.Errorf("fail to remove etcd member (%v): %v", toRemove.Name, err)
return err
}
}
Expand All @@ -212,10 +217,24 @@ func (c *Cluster) removeMember(toRemove *etcdutil.Member) error {
if err := c.removePod(toRemove.Name); err != nil {
return err
}
if c.isPodPVEnabled() {
err = c.removePVC(k8sutil.PVCNameFromMember(toRemove.Name))
if err != nil {
return err
}
}
c.logger.Infof("removed member (%v) with ID (%d)", toRemove.Name, toRemove.ID)
return nil
}

func (c *Cluster) removePVC(pvcName string) error {
err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).Delete(pvcName, nil)
if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) {
return fmt.Errorf("remove pvc (%s) failed: %v", pvcName, err)
}
return nil
}

func needUpgrade(pods []*v1.Pod, cs api.ClusterSpec) bool {
return len(pods) == cs.Size && pickOneOldMember(pods, cs.Version) != nil
}
Expand Down
38 changes: 35 additions & 3 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func GetPodNames(pods []*v1.Pod) []string {
return res
}

// PVCNameFromMember the way we get PVC name from the member name
func PVCNameFromMember(memberName string) string {
return memberName
}

func makeRestoreInitContainers(backupURL *url.URL, token, repo, version string, m *etcdutil.Member) []v1.Container {
return []v1.Container{
{
Expand Down Expand Up @@ -209,6 +214,19 @@ func newEtcdServiceManifest(svcName, clusterName, clusterIP string, ports []v1.S
return svc
}

// AddEtcdVolumeToPod abstract the process of appending volume spec to pod spec
func AddEtcdVolumeToPod(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) {
vol := v1.Volume{Name: etcdVolumeName}
if pvc != nil {
vol.VolumeSource = v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: pvc.Name},
}
} else {
vol.VolumeSource = v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}
}
pod.Spec.Volumes = append(pod.Spec.Volumes, vol)
}

func addRecoveryToPod(pod *v1.Pod, token string, m *etcdutil.Member, cs api.ClusterSpec, backupURL *url.URL) {
pod.Spec.InitContainers = append(pod.Spec.InitContainers,
makeRestoreInitContainers(backupURL, token, cs.Repository, cs.Version, m)...)
Expand All @@ -223,6 +241,8 @@ func addOwnerRefToObject(o metav1.Object, r metav1.OwnerReference) {
func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Member, cs api.ClusterSpec, owner metav1.OwnerReference, backupURL *url.URL) *v1.Pod {
token := uuid.New()
pod := newEtcdPod(m, ms.PeerURLPairs(), clusterName, "new", token, cs)
// TODO: PVC datadir support for restore process
AddEtcdVolumeToPod(pod, nil)
if backupURL != nil {
addRecoveryToPod(pod, token, m, cs, backupURL)
}
Expand All @@ -231,6 +251,20 @@ func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Mem
return pod
}

// NewEtcdPodPVC create PVC object from etcd pod's PVC spec
func NewEtcdPodPVC(m *etcdutil.Member, pvcSpec v1.PersistentVolumeClaimSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim {
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: PVCNameFromMember(m.Name),
Namespace: namespace,
Labels: LabelsForCluster(clusterName),
},
Spec: pvcSpec,
}
addOwnerRefToObject(pvc.GetObjectMeta(), owner)
return pvc
}

func newEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, token string, cs api.ClusterSpec) *v1.Pod {
commands := fmt.Sprintf("/usr/local/bin/etcd --data-dir=%s --name=%s --initial-advertise-peer-urls=%s "+
"--listen-peer-urls=%s --listen-client-urls=%s --advertise-client-urls=%s "+
Expand Down Expand Up @@ -264,9 +298,7 @@ func newEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state,
livenessProbe,
readinessProbe)

volumes := []v1.Volume{
{Name: "etcd-data", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
}
volumes := []v1.Volume{}

if m.SecurePeer {
container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{
Expand Down
59 changes: 59 additions & 0 deletions test/e2e/pv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2017 The etcd-operator Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"os"
"testing"

api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/coreos/etcd-operator/test/e2e/e2eutil"
"github.com/coreos/etcd-operator/test/e2e/framework"
)

func TestCreateClusterWithPV(t *testing.T) {
if os.Getenv(envParallelTest) == envParallelTestTrue {
t.Parallel()
}
f := framework.Global
c := e2eutil.NewCluster("test-etcd-", 3)
c.Spec.Pod = &api.PodPolicy{
PersistentVolumeClaimSpec: &v1.PersistentVolumeClaimSpec{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("512Mi")},
},
StorageClassName: func(s string) *string { return &s }("standard"),
},
}

testEtcd, err := e2eutil.CreateCluster(t, f.CRClient, f.Namespace, c)
if err != nil {
t.Fatal(err)
}

defer func() {
if err := e2eutil.DeleteCluster(t, f.CRClient, f.KubeClient, testEtcd); err != nil {
t.Fatal(err)
}
}()

if _, err := e2eutil.WaitUntilSizeReached(t, f.CRClient, 3, 30, testEtcd); err != nil {
t.Fatalf("failed to create 3 members etcd cluster: %v", err)
}
}