Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch node removal #314

Merged
merged 14 commits into from
Mar 10, 2022
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (c *M3DBController) handleClusterUpdate(
// trigger a remove so that we can shrink the set.
if inPlacement > desired {
setLogger.Info("remove instance from placement for set")
return c.shrinkPlacementForSet(cluster, set, placement)
return c.shrinkPlacementForSet(cluster, set, placement, int(inPlacement-desired))
}

var newCount int32
Expand Down
39 changes: 28 additions & 11 deletions pkg/controller/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (c *M3DBController) expandPlacementForSet(
// removes the last pod in the StatefulSet from the active placement, enabling
// the StatefulSet size to be decreased once the remove completes.
func (c *M3DBController) shrinkPlacementForSet(
cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, pl placement.Placement,
cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, pl placement.Placement, removeCount int,
) error {
if cluster.Spec.PreventScaleDown {
return pkgerrors.Errorf("cannot remove nodes from %s/%s, preventScaleDown is true",
Expand All @@ -556,25 +556,36 @@ func (c *M3DBController) shrinkPlacementForSet(
return err
}

_, removeInst, err := c.findPodInstanceToRemove(cluster, pl, pods)
_, removeInst, err := c.findPodInstancesToRemove(cluster, pl, pods, removeCount)
if err != nil {
c.logger.Error("error finding pod to remove", zap.Error(err))
return err
}

c.logger.Info("removing pod from placement", zap.String("instance", removeInst.ID()))
return c.adminClient.placementClientForCluster(cluster).Remove([]string{removeInst.ID()})
if len(removeInst) == 0 {
c.logger.Info("nothing to remove, skipping remove call")
return nil
}

removeIds := make([]string, len(removeInst))
for idx, inst := range removeInst {
removeIds[idx] = inst.ID()
}
c.logger.Info("removing instances from placement",
zap.String("instances", strings.Join(removeIds, ",")))
return c.adminClient.placementClientForCluster(cluster).Remove(removeIds)
}

// findPodInstanceToRemove returns the pod (and associated placement instace)
// findPodInstancesToRemove returns the pod (and associated placement instace)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: s/pod/pods

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can you update the comment to reflect that this will return multiple pods / instances now?

// with the highest ordinal number in the stateful set AND in the placement, so
// that we remove from the placement the pod that will be deleted when the set
// size is scaled down.
func (c *M3DBController) findPodInstanceToRemove(
func (c *M3DBController) findPodInstancesToRemove(
cluster *myspec.M3DBCluster,
pl placement.Placement,
pods []*corev1.Pod,
) (*corev1.Pod, placement.Instance, error) {
removeCount int,
) ([]*corev1.Pod, []placement.Instance, error) {
if len(pods) == 0 {
return nil, nil, errEmptyPodList
}
Expand All @@ -584,7 +595,12 @@ func (c *M3DBController) findPodInstanceToRemove(
return nil, nil, pkgerrors.WithMessage(err, "cannot sort pods")
}

for i := len(podIDs) - 1; i >= 0; i-- {
var (
podsToRemove []*corev1.Pod
instancesToRemove []placement.Instance
leftToRemove = removeCount
)
for i := len(podIDs) - 1; i >= 0 && leftToRemove > 0; i-- {
pod := podIDs[i].pod
inst, err := c.findPodInPlacement(cluster, pl, pod)
if pkgerrors.Cause(err) == errPodNotInPlacement {
Expand All @@ -595,10 +611,11 @@ func (c *M3DBController) findPodInstanceToRemove(
if err != nil {
return nil, nil, pkgerrors.WithMessage(err, "error finding pod in placement")
}
return pod, inst, nil
leftToRemove--
podsToRemove = append(podsToRemove, pod)
instancesToRemove = append(instancesToRemove, inst)
}

return nil, nil, errNoPodsInPlacement
return podsToRemove, instancesToRemove, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure this is doing what the user intends, do we want to return an error if len(podsToRemove) != removeCount? Not sure if there's a case where that could happen.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only case where this can happen is if we somehow want to remove more than exists. There is a test case when removeCount is for 4 but we have only 3 instances in placement.

In practice this should not happen because we remove only if placement contains more than desired:

		if inPlacement > desired {
			setLogger.Info("remove instance from placement for set")
			return c.shrinkPlacementForSet(cluster, set, placement, int(inPlacement-desired))
		}

. Maybe if we spam expand/shrink we could enter some weird state here. But I think it's hard to reason what will happen anyways.

Another approach that potentially can be safer is to instead of having "removeCount" we could pass in "desired" so that we try to reach target state, we could than assert if target state makes sense like "desired" > "currentPodCount" otherwise it looks like expansion.

}

// findPodInPlacement looks up a pod in the placement. Equality is based on
Expand Down
151 changes: 99 additions & 52 deletions pkg/controller/update_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,54 +678,101 @@ func TestExpandPlacementForSet_Err(t *testing.T) {
}

func TestShrinkPlacementForSet(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)

set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)

pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
placementMock := deps.placementClient
controller := deps.newController(t)
defer deps.cleanup()

identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods, deps.idProvider)

// Expect the last pod to be removed.
placementMock.EXPECT().Remove([]string{`{"name":"cluster-zones-rep0-2","uid":"2"}`})
err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.NoError(t, err)

// If there are more pods in the set then in the placement, we expect the last
// in the set to be removed.
pl = placementFromPods(t, cluster, pods[:2], deps.idProvider)
placementMock.EXPECT().Remove([]string{`{"name":"cluster-zones-rep0-1","uid":"1"}`})
err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.NoError(t, err)
}
tests := []struct {
name string
removeCount int
placementPodsCount int
preventScaleDown bool
expectedRemovedIds []string
expectedErr string
}{
{
name: "remove single pod",
removeCount: 1,
placementPodsCount: 3,
expectedRemovedIds: []string{`{"name":"cluster-zones-rep0-2","uid":"2"}`},
},
{
name: "empty placement",
removeCount: 1,
placementPodsCount: 0,
expectedRemovedIds: nil,
},
{
name: "remove multiple pods",
removeCount: 2,
placementPodsCount: 3,
expectedRemovedIds: []string{
`{"name":"cluster-zones-rep0-2","uid":"2"}`,
`{"name":"cluster-zones-rep0-1","uid":"1"}`,
},
},
{
name: "remove last when placement contains less instances than pods",
removeCount: 1,
placementPodsCount: 2,
expectedRemovedIds: []string{`{"name":"cluster-zones-rep0-1","uid":"1"}`},
},
{
name: "remove more pods than exists",
removeCount: 4,
placementPodsCount: 3,
expectedRemovedIds: []string{
`{"name":"cluster-zones-rep0-2","uid":"2"}`,
`{"name":"cluster-zones-rep0-1","uid":"1"}`,
`{"name":"cluster-zones-rep0-0","uid":"0"}`,
},
},
{
name: "remove all pods",
removeCount: 3,
placementPodsCount: 3,
expectedRemovedIds: []string{
`{"name":"cluster-zones-rep0-2","uid":"2"}`,
`{"name":"cluster-zones-rep0-1","uid":"1"}`,
`{"name":"cluster-zones-rep0-0","uid":"0"}`,
},
},
{
name: "prevent scale down",
preventScaleDown: true,
expectedErr: "cannot remove nodes from fake/cluster-zones, preventScaleDown is true",
expectedRemovedIds: nil,
},
}

func TestShrinkPlacementForSet_PreventScaleDown(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)
cluster.Spec.PreventScaleDown = true
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)
cluster.Spec.PreventScaleDown = tc.preventScaleDown

set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)
set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)

pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
controller := deps.newController(t)
defer deps.cleanup()
pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
placementMock := deps.placementClient
controller := deps.newController(t)
defer deps.cleanup()

identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods, deps.idProvider)
identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods[:tc.placementPodsCount], deps.idProvider)

err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.EqualError(t, err, "cannot remove nodes from fake/cluster-zones, preventScaleDown is true")
if len(tc.expectedRemovedIds) > 0 {
placementMock.EXPECT().Remove(tc.expectedRemovedIds)
}
err = controller.shrinkPlacementForSet(cluster, set, pl, tc.removeCount)
if tc.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}

func podWithName(name string) *corev1.Pod {
Expand Down Expand Up @@ -1125,28 +1172,28 @@ func TestFindPodToRemove(t *testing.T) {
assert.Equal(t, pl.Instances()[0], inst)

// Can't remove from no pods.
_, _, err = controller.findPodInstanceToRemove(cluster, pl, nil)
_, _, err = controller.findPodInstancesToRemove(cluster, pl, nil, 1)
assert.Equal(t, errEmptyPodList, err)

// Can't remove from malformed pod names.
_, _, err = controller.findPodInstanceToRemove(cluster, pl, []*corev1.Pod{
_, _, err = controller.findPodInstancesToRemove(cluster, pl, []*corev1.Pod{
podWithName("foo"),
})
}, 1)
assert.Contains(t, err.Error(), "cannot sort pods")

// Removing from a placement w/ all pods removes the last.
pod, inst, err := controller.findPodInstanceToRemove(cluster, pl, pods)
podsToRemove, insts, err := controller.findPodInstancesToRemove(cluster, pl, pods, 1)
assert.NoError(t, err)
assert.Equal(t, pods[2], pod)
assert.Equal(t, pl.Instances()[2], inst)
assert.Equal(t, pods[2], podsToRemove[0])
assert.Equal(t, pl.Instances()[2], insts[0])

// Removing from a placement w/ 2 insts and 3 pods removes the last pod that's
// still in the placement.
pl = placementFromPods(t, cluster, pods[:2], idProvider)
pod, inst, err = controller.findPodInstanceToRemove(cluster, pl, pods)
podsToRemove, insts, err = controller.findPodInstancesToRemove(cluster, pl, pods, 1)
assert.NoError(t, err)
assert.Equal(t, pods[1], pod)
assert.Equal(t, pl.Instances()[1], inst)
assert.Equal(t, pods[1], podsToRemove[0])
assert.Equal(t, pl.Instances()[1], insts[0])
}

func TestEtcdFinalizer(t *testing.T) {
Expand Down