diff --git a/pkg/apis/etcd/v1beta2/cluster.go b/pkg/apis/etcd/v1beta2/cluster.go index 702aecbe4..db49e94f2 100644 --- a/pkg/apis/etcd/v1beta2/cluster.go +++ b/pkg/apis/etcd/v1beta2/cluster.go @@ -138,6 +138,12 @@ type PodPolicy struct { // bootstrap the cluster (for example `--initial-cluster` flag). // This field cannot be updated. EtcdEnv []v1.EnvVar `json:"etcdEnv,omitempty"` + + // PersistentVolumeClaimSpec is the spec to describe PVC for the etcd container + // This field is optional. If no PVC spec, etcd container will use emptyDir as volume + // Note. This feature is in alpha stage. It is currently only used as non-stable storage, + // not the stable storage. Future work need to make it used as stable storage. + PersistentVolumeClaimSpec *v1.PersistentVolumeClaimSpec `json:"persistentVolumeClaimSpec,omitempty"` } func (c *ClusterSpec) Validate() error { diff --git a/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go b/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go index 7df704055..3957a5b27 100644 --- a/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go +++ b/pkg/apis/etcd/v1beta2/zz_generated.deepcopy.go @@ -563,6 +563,15 @@ func (in *PodPolicy) DeepCopyInto(out *PodPolicy) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.PersistentVolumeClaimSpec != nil { + in, out := &in.PersistentVolumeClaimSpec, &out.PersistentVolumeClaimSpec + if *in == nil { + *out = nil + } else { + *out = new(v1.PersistentVolumeClaimSpec) + (*in).DeepCopyInto(*out) + } + } return } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 19166b804..08f3b95f8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -375,9 +375,26 @@ func (c *Cluster) setupServices() error { return k8sutil.CreatePeerService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner()) } +func (c *Cluster) isPodPVEnabled() bool { + if podPolicy := c.cluster.Spec.Pod; podPolicy != nil { + return podPolicy.PersistentVolumeClaimSpec != nil + } + return false +} + func (c *Cluster) createPod(members etcdutil.MemberSet, m *etcdutil.Member, state string) error { pod := k8sutil.NewEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, uuid.New(), c.cluster.Spec, c.cluster.AsOwner()) - _, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod) + if c.isPodPVEnabled() { + pvc := k8sutil.NewEtcdPodPVC(m, *c.cluster.Spec.Pod.PersistentVolumeClaimSpec, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner()) + _, err := c.config.KubeCli.CoreV1().PersistentVolumeClaims(c.cluster.Namespace).Create(pvc) + if err != nil { + return fmt.Errorf("failed to create PVC for member (%s): %v", m.Name, err) + } + k8sutil.AddEtcdVolumeToPod(pod, pvc) + } else { + k8sutil.AddEtcdVolumeToPod(pod, nil) + } + _, err := c.config.KubeCli.CoreV1().Pods(c.cluster.Namespace).Create(pod) return err } diff --git a/pkg/cluster/reconcile.go b/pkg/cluster/reconcile.go index 5e6821a59..f6568072a 100644 --- a/pkg/cluster/reconcile.go +++ b/pkg/cluster/reconcile.go @@ -193,14 +193,19 @@ func (c *Cluster) removeDeadMember(toRemove *etcdutil.Member) error { return c.removeMember(toRemove) } -func (c *Cluster) removeMember(toRemove *etcdutil.Member) error { - err := etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID) +func (c *Cluster) removeMember(toRemove *etcdutil.Member) (err error) { + defer func() { + if err != nil { + err = fmt.Errorf("remove member (%s) failed: %v", toRemove.Name, err) + } + }() + + err = etcdutil.RemoveMember(c.members.ClientURLs(), c.tlsConfig, toRemove.ID) if err != nil { switch err { case rpctypes.ErrMemberNotFound: c.logger.Infof("etcd member (%v) has been removed", toRemove.Name) default: - c.logger.Errorf("fail to remove etcd member (%v): %v", toRemove.Name, err) return err } } @@ -212,10 +217,24 @@ func (c *Cluster) removeMember(toRemove *etcdutil.Member) error { if err := c.removePod(toRemove.Name); err != nil { return err } + if c.isPodPVEnabled() { + err = c.removePVC(k8sutil.PVCNameFromMember(toRemove.Name)) + if err != nil { + return err + } + } c.logger.Infof("removed member (%v) with ID (%d)", toRemove.Name, toRemove.ID) return nil } +func (c *Cluster) removePVC(pvcName string) error { + err := c.config.KubeCli.Core().PersistentVolumeClaims(c.cluster.Namespace).Delete(pvcName, nil) + if err != nil && !k8sutil.IsKubernetesResourceNotFoundError(err) { + return fmt.Errorf("remove pvc (%s) failed: %v", pvcName, err) + } + return nil +} + func needUpgrade(pods []*v1.Pod, cs api.ClusterSpec) bool { return len(pods) == cs.Size && pickOneOldMember(pods, cs.Version) != nil } diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index fa10b7a95..a69d9dfa2 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -78,6 +78,11 @@ func GetPodNames(pods []*v1.Pod) []string { return res } +// PVCNameFromMember the way we get PVC name from the member name +func PVCNameFromMember(memberName string) string { + return memberName +} + func makeRestoreInitContainers(backupURL *url.URL, token, repo, version string, m *etcdutil.Member) []v1.Container { return []v1.Container{ { @@ -209,6 +214,19 @@ func newEtcdServiceManifest(svcName, clusterName, clusterIP string, ports []v1.S return svc } +// AddEtcdVolumeToPod abstract the process of appending volume spec to pod spec +func AddEtcdVolumeToPod(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) { + vol := v1.Volume{Name: etcdVolumeName} + if pvc != nil { + vol.VolumeSource = v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: pvc.Name}, + } + } else { + vol.VolumeSource = v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}} + } + pod.Spec.Volumes = append(pod.Spec.Volumes, vol) +} + func addRecoveryToPod(pod *v1.Pod, token string, m *etcdutil.Member, cs api.ClusterSpec, backupURL *url.URL) { pod.Spec.InitContainers = append(pod.Spec.InitContainers, makeRestoreInitContainers(backupURL, token, cs.Repository, cs.Version, m)...) @@ -223,6 +241,8 @@ func addOwnerRefToObject(o metav1.Object, r metav1.OwnerReference) { func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Member, cs api.ClusterSpec, owner metav1.OwnerReference, backupURL *url.URL) *v1.Pod { token := uuid.New() pod := newEtcdPod(m, ms.PeerURLPairs(), clusterName, "new", token, cs) + // TODO: PVC datadir support for restore process + AddEtcdVolumeToPod(pod, nil) if backupURL != nil { addRecoveryToPod(pod, token, m, cs, backupURL) } @@ -231,6 +251,20 @@ func NewSeedMemberPod(clusterName string, ms etcdutil.MemberSet, m *etcdutil.Mem return pod } +// NewEtcdPodPVC create PVC object from etcd pod's PVC spec +func NewEtcdPodPVC(m *etcdutil.Member, pvcSpec v1.PersistentVolumeClaimSpec, clusterName, namespace string, owner metav1.OwnerReference) *v1.PersistentVolumeClaim { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: PVCNameFromMember(m.Name), + Namespace: namespace, + Labels: LabelsForCluster(clusterName), + }, + Spec: pvcSpec, + } + addOwnerRefToObject(pvc.GetObjectMeta(), owner) + return pvc +} + func newEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, token string, cs api.ClusterSpec) *v1.Pod { commands := fmt.Sprintf("/usr/local/bin/etcd --data-dir=%s --name=%s --initial-advertise-peer-urls=%s "+ "--listen-peer-urls=%s --listen-client-urls=%s --advertise-client-urls=%s "+ @@ -264,9 +298,7 @@ func newEtcdPod(m *etcdutil.Member, initialCluster []string, clusterName, state, livenessProbe, readinessProbe) - volumes := []v1.Volume{ - {Name: "etcd-data", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}}, - } + volumes := []v1.Volume{} if m.SecurePeer { container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{ diff --git a/test/e2e/pv_test.go b/test/e2e/pv_test.go new file mode 100644 index 000000000..36b90d673 --- /dev/null +++ b/test/e2e/pv_test.go @@ -0,0 +1,59 @@ +// Copyright 2017 The etcd-operator Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "os" + "testing" + + api "github.com/coreos/etcd-operator/pkg/apis/etcd/v1beta2" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/coreos/etcd-operator/test/e2e/e2eutil" + "github.com/coreos/etcd-operator/test/e2e/framework" +) + +func TestCreateClusterWithPV(t *testing.T) { + if os.Getenv(envParallelTest) == envParallelTestTrue { + t.Parallel() + } + f := framework.Global + c := e2eutil.NewCluster("test-etcd-", 3) + c.Spec.Pod = &api.PodPolicy{ + PersistentVolumeClaimSpec: &v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("512Mi")}, + }, + StorageClassName: func(s string) *string { return &s }("standard"), + }, + } + + testEtcd, err := e2eutil.CreateCluster(t, f.CRClient, f.Namespace, c) + if err != nil { + t.Fatal(err) + } + + defer func() { + if err := e2eutil.DeleteCluster(t, f.CRClient, f.KubeClient, testEtcd); err != nil { + t.Fatal(err) + } + }() + + if _, err := e2eutil.WaitUntilSizeReached(t, f.CRClient, 3, 30, testEtcd); err != nil { + t.Fatalf("failed to create 3 members etcd cluster: %v", err) + } +}