Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always create missing stateful sets #148

Merged
merged 4 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/controller/add_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func TestEnsureService_Base(t *testing.T) {
func TestEnsureConfigMap(t *testing.T) {
cluster := getFixture("cluster-simple.yaml", t)
deps := newTestDeps(t, &testOpts{})
controller := deps.newController(t)
defer deps.cleanup()

require.NoError(t, registerValidConfigMap("my_config_data"))

controller := deps.newController()
err := controller.ensureConfigMap(cluster)
assert.NoError(t, err)

Expand All @@ -99,12 +99,11 @@ func TestEnsureConfigMap(t *testing.T) {
func TestEnsureConfigMap_Update(t *testing.T) {
cluster := getFixture("cluster-simple.yaml", t)
deps := newTestDeps(t, &testOpts{})
controller := deps.newController(t)
defer deps.cleanup()

require.NoError(t, registerValidConfigMap("my_config_data"))

controller := deps.newController()

err := controller.ensureConfigMap(cluster)
assert.NoError(t, err)

Expand Down
14 changes: 12 additions & 2 deletions pkg/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
crdfake "github.com/m3db/m3db-operator/pkg/client/clientset/versioned/fake"
crdinformers "github.com/m3db/m3db-operator/pkg/client/informers/externalversions"
crdlisters "github.com/m3db/m3db-operator/pkg/client/listers/m3dboperator/v1alpha1"
"github.com/m3db/m3db-operator/pkg/k8sops"
"github.com/m3db/m3db-operator/pkg/k8sops/podidentity"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
Expand Down Expand Up @@ -70,20 +71,29 @@ type testDeps struct {
closed int32
}

func (deps *testDeps) newController() *Controller {
func (deps *testDeps) newController(t *testing.T) *Controller {
logger := zap.NewNop()
m := newMultiAdminClient(nil, zap.NewNop())
m.nsClientFn = func(...namespace.Option) (namespace.Client, error) {
return deps.namespaceClient, nil
}
m.plClientFn = func(...placement.Option) (placement.Client, error) {
return deps.placementClient, nil
}
k8sopsClient, err := k8sops.New(
k8sops.WithKClient(deps.kubeClient),
k8sops.WithCRDClient(deps.crdClient),
k8sops.WithLogger(logger))

require.NoError(t, err)

return &Controller{
logger: zap.NewNop(),
logger: logger,
scope: tally.NoopScope,
clock: deps.clock,
adminClient: m,

k8sclient: k8sopsClient,
kubeClient: deps.kubeClient,
crdClient: deps.crdClient,
podIDProvider: deps.idProvider,
Expand Down
36 changes: 18 additions & 18 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
return err
}

childrenSetsByName := make(map[string]*appsv1.StatefulSet)
for _, sts := range childrenSets {
childrenSetsByName[sts.Name] = sts
// if any of the statefulsets aren't ready, wait until they are as we'll get
// another event (ready == bootstrapped)
if sts.Spec.Replicas != nil && *sts.Spec.Replicas != sts.Status.ReadyReplicas {
Expand All @@ -427,26 +429,25 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
}
}

// At this point all existing statefulsets are bootstrapped.

if len(childrenSets) != len(isoGroups) {
nextID := len(childrenSets)
// create a statefulset
// Create any missing statefulsets, at this point all existing stateful sets are bootstrapped.
for i := 0; i < len(isoGroups); i++ {
name := k8sops.StatefulSetName(cluster.Name, i)
_, exists := childrenSetsByName[name]
if !exists {
sts, err := k8sops.GenerateStatefulSet(cluster, isoGroups[i].Name, isoGroups[i].NumInstances)
if err != nil {
return err
}

name := fmt.Sprintf("%s-%d", cluster.Name, nextID)
sts, err := k8sops.GenerateStatefulSet(cluster, isoGroups[nextID].Name, isoGroups[nextID].NumInstances)
if err != nil {
return err
}
_, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Create(sts)
if err != nil {
c.logger.Error(err.Error())
return err
}

_, err = c.kubeClient.AppsV1().StatefulSets(cluster.Namespace).Create(sts)
if err != nil {
c.logger.Error(err.Error())
return err
c.logger.Info("created statefulset", zap.String("name", name))
return nil
}

c.logger.Info("created statefulset", zap.String("name", name))
return nil
}

if err := c.reconcileNamespaces(cluster); err != nil {
Expand Down Expand Up @@ -488,7 +489,6 @@ func (c *Controller) handleClusterUpdate(cluster *myspec.M3DBCluster) error {
if !inst.IsAvailable() {
unavailInsts = append(unavailInsts, inst.ID())
}

}

if ln := len(unavailInsts); ln > 0 {
Expand Down
157 changes: 151 additions & 6 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
package controller

import (
"fmt"
"sort"
"sync"
"testing"
"time"

myspec "github.com/m3db/m3db-operator/pkg/apis/m3dboperator/v1alpha1"
clientsetfake "github.com/m3db/m3db-operator/pkg/client/clientset/versioned/fake"
m3dbinformers "github.com/m3db/m3db-operator/pkg/client/informers/externalversions"
"github.com/m3db/m3db-operator/pkg/k8sops"
"github.com/m3db/m3db-operator/pkg/k8sops/labels"
"github.com/m3db/m3db-operator/pkg/k8sops/podidentity"

"github.com/m3db/m3/src/cluster/placement"
Expand All @@ -41,6 +44,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
kubetesting "k8s.io/client-go/testing"

"github.com/golang/mock/gomock"
pkgerrors "github.com/pkg/errors"
Expand Down Expand Up @@ -139,7 +144,7 @@ func TestGetChildStatefulSets(t *testing.T) {
kubeObjects: objects,
})

c := deps.newController()
c := deps.newController(t)

// Ensure that with no owner references we don't act on these sets
children, err := c.getChildStatefulSets(cluster)
Expand Down Expand Up @@ -237,10 +242,9 @@ func TestGetParentCluster(t *testing.T) {
cluster,
},
})
c := deps.newController(t)
defer deps.cleanup()

c := deps.newController()

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Expand Down Expand Up @@ -277,8 +281,8 @@ func TestHandlePodUpdate(t *testing.T) {
crdObjects: []runtime.Object{cluster},
kubeObjects: []runtime.Object{pod1},
})
c := deps.newController(t)
defer deps.cleanup()
c := deps.newController()

mockID := &myspec.PodIdentity{
Name: "pod1",
Expand All @@ -302,7 +306,8 @@ func TestHandlePodUpdate(t *testing.T) {
func TestClusterEventLoop(t *testing.T) {
deps := newTestDeps(t, &testOpts{})
defer deps.cleanup()
c := deps.newController()

c := deps.newController(t)

cluster := &myspec.M3DBCluster{
ObjectMeta: newObjectMeta("foo", nil),
Expand Down Expand Up @@ -335,7 +340,8 @@ func TestClusterEventLoop(t *testing.T) {
func TestPodEventLoop(t *testing.T) {
deps := newTestDeps(t, &testOpts{})
defer deps.cleanup()
c := deps.newController()

c := deps.newController(t)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -415,3 +421,142 @@ func TestValidateIsolationGroups(t *testing.T) {
}
}
}

func TestHandleUpdateClusterCreatesStatefulSets(t *testing.T) {
tests := []struct {
name string
cluster *metav1.ObjectMeta
sets []*metav1.ObjectMeta
replicationFactor int
expCreateStatefulSets []string
}{
{
name: "creates missing stateful sets at tail",
cluster: newMeta("cluster1", map[string]string{
"foo": "bar",
"operator.m3db.io/app": "m3db",
"operator.m3db.io/cluster": "cluster1",
}),
sets: []*metav1.ObjectMeta{
newMeta("cluster1-rep0", nil),
},
replicationFactor: 3,
expCreateStatefulSets: []string{"cluster1-rep1", "cluster1-rep2"},
},
{
name: "creates missing stateful sets at head and tail",
cluster: newMeta("cluster1", map[string]string{
"foo": "bar",
"operator.m3db.io/app": "m3db",
"operator.m3db.io/cluster": "cluster1",
}),
sets: []*metav1.ObjectMeta{
newMeta("cluster1-rep1", nil),
},
replicationFactor: 3,
expCreateStatefulSets: []string{"cluster1-rep0", "cluster1-rep2"},
},
{
name: "creates missing stateful sets at head",
cluster: newMeta("cluster1", map[string]string{
"foo": "bar",
"operator.m3db.io/app": "m3db",
"operator.m3db.io/cluster": "cluster1",
}),
sets: []*metav1.ObjectMeta{
newMeta("cluster1-rep2", nil),
},
replicationFactor: 3,
expCreateStatefulSets: []string{"cluster1-rep0", "cluster1-rep1"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cfgMapName := "configMapName"
cluster := &myspec.M3DBCluster{
ObjectMeta: *test.cluster,
Spec: myspec.ClusterSpec{
ReplicationFactor: int32(test.replicationFactor),
ConfigMapName: &cfgMapName,
},
}
cluster.ObjectMeta.UID = "abcd"
for i := 0; i < test.replicationFactor; i++ {
group := myspec.IsolationGroup{
Name: fmt.Sprintf("group%d", i),
NumInstances: 1,
}
cluster.Spec.IsolationGroups = append(cluster.Spec.IsolationGroups, group)
}
cluster.ObjectMeta.Finalizers = []string{labels.EtcdDeletionFinalizer}

objects := make([]runtime.Object, len(test.sets))
statefulSets := make([]*appsv1.StatefulSet, len(test.sets))
for i, s := range test.sets {
set := &appsv1.StatefulSet{
ObjectMeta: *s,
}
// Apply base labels
if set.ObjectMeta.Labels == nil {
set.ObjectMeta.Labels = make(map[string]string)
}
for k, v := range labels.BaseLabels(cluster) {
set.ObjectMeta.Labels[k] = v
}
statefulSets[i] = set
objects[i] = set
set.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(cluster, schema.GroupVersionKind{
Group: myspec.SchemeGroupVersion.Group,
Version: myspec.SchemeGroupVersion.Version,
Kind: "m3dbcluster",
}),
}
}

deps := newTestDeps(t, &testOpts{
crdObjects: []runtime.Object{
&myspec.M3DBCluster{
ObjectMeta: *test.cluster,
},
},
kubeObjects: objects,
})
defer deps.cleanup()
c := deps.newController(t)

// Keep running cluster updates until no more stateful sets required
var expectedMu sync.Mutex
expectedSetsCreated := make(map[string]bool)
for _, name := range test.expCreateStatefulSets {
expectedSetsCreated[name] = false
}

c.kubeClient.(*kubefake.Clientset).PrependReactor("create", "statefulsets", func(action ktesting.Action) (bool, runtime.Object, error) {
cluster := action.(kubetesting.CreateActionImpl).GetObject().(*appsv1.StatefulSet)
name := cluster.Name
expectedMu.Lock()
expectedSetsCreated[name] = true
expectedMu.Unlock()
return true, cluster, nil
})

var done bool
for i := 0; i < 5; i++ {
err := c.handleClusterUpdate(cluster)
require.NoError(t, err)

expectedMu.Lock()
created := len(expectedSetsCreated)
expectedMu.Unlock()
if created != len(test.expCreateStatefulSets) {
time.Sleep(100 * time.Millisecond)
continue
}
done = true
}
assert.True(t, done, "expected all sets to be created")
})
}
}
Loading