Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch node removal #314

Merged
merged 14 commits into from
Mar 10, 2022
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ func (c *M3DBController) handleClusterUpdate(
// If there are more pods in the placement than we want in the group,
// trigger a remove so that we can shrink the set.
if inPlacement > desired {
setLogger.Info("remove instance from placement for set")
return c.shrinkPlacementForSet(cluster, set, placement)
setLogger.Info("shrinking placement for set")
return c.shrinkPlacementForSet(cluster, set, placement, int(desired))
}

var newCount int32
Expand Down
51 changes: 35 additions & 16 deletions pkg/controller/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,16 +538,20 @@ func (c *M3DBController) expandPlacementForSet(
return c.addPodsToPlacement(ctx, cluster, podsToAdd)
}

// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and
// removes the last pod in the StatefulSet from the active placement, enabling
// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and removes any pods
// that are above desired instance count in the StatefulSet from the active placement, enabling
// the StatefulSet size to be decreased once the remove completes.
func (c *M3DBController) shrinkPlacementForSet(
cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, pl placement.Placement,
cluster *myspec.M3DBCluster, set *appsv1.StatefulSet,
pl placement.Placement, desiredInstanceCount int,
) error {
if cluster.Spec.PreventScaleDown {
return pkgerrors.Errorf("cannot remove nodes from %s/%s, preventScaleDown is true",
cluster.Namespace, cluster.Name)
}
if desiredInstanceCount < 0 {
return pkgerrors.New(fmt.Sprintf("desired instance count is negative: %d", desiredInstanceCount))
}

selector := klabels.SelectorFromSet(set.Labels)
pods, err := c.podLister.Pods(cluster.Namespace).List(selector)
Expand All @@ -556,25 +560,36 @@ func (c *M3DBController) shrinkPlacementForSet(
return err
}

_, removeInst, err := c.findPodInstanceToRemove(cluster, pl, pods)
_, removeInst, err := c.findPodsAndInstancesToRemove(cluster, pl, pods, desiredInstanceCount)
if err != nil {
c.logger.Error("error finding pod to remove", zap.Error(err))
c.logger.Error("error finding pods to remove", zap.Error(err))
return err
}

c.logger.Info("removing pod from placement", zap.String("instance", removeInst.ID()))
return c.adminClient.placementClientForCluster(cluster).Remove([]string{removeInst.ID()})
if len(removeInst) == 0 {
c.logger.Info("nothing to remove, skipping remove call")
return nil
}

removeIds := make([]string, len(removeInst))
for idx, inst := range removeInst {
removeIds[idx] = inst.ID()
}
c.logger.Info("removing instances from placement",
zap.String("instances", strings.Join(removeIds, ",")))
return c.adminClient.placementClientForCluster(cluster).Remove(removeIds)
}

// findPodInstanceToRemove returns the pod (and associated placement instace)
// findPodsAndInstancesToRemove returns pods (and associated placement instances)
// with the highest ordinal number in the stateful set AND in the placement, so
// that we remove from the placement the pod that will be deleted when the set
// that we remove from the placement pods that will be deleted when the set
// size is scaled down.
func (c *M3DBController) findPodInstanceToRemove(
func (c *M3DBController) findPodsAndInstancesToRemove(
cluster *myspec.M3DBCluster,
pl placement.Placement,
pods []*corev1.Pod,
) (*corev1.Pod, placement.Instance, error) {
desiredInstanceCount int,
) ([]*corev1.Pod, []placement.Instance, error) {
if len(pods) == 0 {
return nil, nil, errEmptyPodList
}
Expand All @@ -584,21 +599,25 @@ func (c *M3DBController) findPodInstanceToRemove(
return nil, nil, pkgerrors.WithMessage(err, "cannot sort pods")
}

for i := len(podIDs) - 1; i >= 0; i-- {
var (
podsToRemove []*corev1.Pod
instancesToRemove []placement.Instance
)
for i := len(podIDs) - 1; i >= desiredInstanceCount; i-- {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe it's worth moving the check that desiredInstanceCount is greater than zero into this function just in case we call it from other places in the future?

pod := podIDs[i].pod
inst, err := c.findPodInPlacement(cluster, pl, pod)
if pkgerrors.Cause(err) == errPodNotInPlacement {
// If the instance is already out of the placement, continue to the next
// If the pod is already out of the placement, continue to the next
// one.
continue
}
if err != nil {
return nil, nil, pkgerrors.WithMessage(err, "error finding pod in placement")
}
return pod, inst, nil
podsToRemove = append(podsToRemove, pod)
instancesToRemove = append(instancesToRemove, inst)
}

return nil, nil, errNoPodsInPlacement
return podsToRemove, instancesToRemove, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure this is doing what the user intends, do we want to return an error if len(podsToRemove) != removeCount? Not sure if there's a case where that could happen.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only case where this can happen is if we somehow want to remove more than exists. There is a test case when removeCount is for 4 but we have only 3 instances in placement.

In practice this should not happen because we remove only if placement contains more than desired:

		if inPlacement > desired {
			setLogger.Info("remove instance from placement for set")
			return c.shrinkPlacementForSet(cluster, set, placement, int(inPlacement-desired))
		}

. Maybe if we spam expand/shrink we could enter some weird state here. But I think it's hard to reason what will happen anyways.

Another approach that potentially can be safer is to instead of having "removeCount" we could pass in "desired" so that we try to reach target state, we could than assert if target state makes sense like "desired" > "currentPodCount" otherwise it looks like expansion.

}

// findPodInPlacement looks up a pod in the placement. Equality is based on
Expand Down
148 changes: 96 additions & 52 deletions pkg/controller/update_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,54 +678,98 @@ func TestExpandPlacementForSet_Err(t *testing.T) {
}

func TestShrinkPlacementForSet(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)

set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)

pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
placementMock := deps.placementClient
controller := deps.newController(t)
defer deps.cleanup()

identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods, deps.idProvider)

// Expect the last pod to be removed.
placementMock.EXPECT().Remove([]string{`{"name":"cluster-zones-rep0-2","uid":"2"}`})
err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.NoError(t, err)

// If there are more pods in the set then in the placement, we expect the last
// in the set to be removed.
pl = placementFromPods(t, cluster, pods[:2], deps.idProvider)
placementMock.EXPECT().Remove([]string{`{"name":"cluster-zones-rep0-1","uid":"1"}`})
err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.NoError(t, err)
}
const instancePerGroup = 3
tests := []struct {
name string
desiredInstanceCount int
placementInstanceCount int
preventScaleDown bool
expectedRemovedIds []string
expectedErr string
}{
{
name: "remove single pod",
desiredInstanceCount: instancePerGroup - 1,
placementInstanceCount: instancePerGroup,
expectedRemovedIds: []string{`{"name":"cluster-zones-rep0-2","uid":"2"}`},
},
{
name: "empty placement",
desiredInstanceCount: instancePerGroup - 1,
placementInstanceCount: 0,
expectedRemovedIds: nil,
},
{
name: "remove multiple pods",
desiredInstanceCount: instancePerGroup - 2,
placementInstanceCount: instancePerGroup,
expectedRemovedIds: []string{
`{"name":"cluster-zones-rep0-2","uid":"2"}`,
`{"name":"cluster-zones-rep0-1","uid":"1"}`,
},
},
{
name: "remove last when placement contains less instances than pods",
desiredInstanceCount: instancePerGroup - 2,
placementInstanceCount: instancePerGroup - 1,
expectedRemovedIds: []string{`{"name":"cluster-zones-rep0-1","uid":"1"}`},
},
{
name: "remove more pods than exists",
desiredInstanceCount: -42,
placementInstanceCount: instancePerGroup,
expectedErr: "desired instance count is negative: -42",
},
{
name: "remove all pods",
desiredInstanceCount: 0,
placementInstanceCount: instancePerGroup,
expectedRemovedIds: []string{
`{"name":"cluster-zones-rep0-2","uid":"2"}`,
`{"name":"cluster-zones-rep0-1","uid":"1"}`,
`{"name":"cluster-zones-rep0-0","uid":"0"}`,
},
},
{
name: "prevent scale down",
preventScaleDown: true,
expectedErr: "cannot remove nodes from fake/cluster-zones, preventScaleDown is true",
expectedRemovedIds: nil,
},
}

func TestShrinkPlacementForSet_PreventScaleDown(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)
cluster.Spec.PreventScaleDown = true
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)
cluster.Spec.PreventScaleDown = tc.preventScaleDown

set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)
set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)

pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
controller := deps.newController(t)
defer deps.cleanup()
pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
placementMock := deps.placementClient
controller := deps.newController(t)
defer deps.cleanup()

identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods, deps.idProvider)
identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods[:tc.placementInstanceCount], deps.idProvider)

err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.EqualError(t, err, "cannot remove nodes from fake/cluster-zones, preventScaleDown is true")
if len(tc.expectedRemovedIds) > 0 {
placementMock.EXPECT().Remove(tc.expectedRemovedIds)
}
err = controller.shrinkPlacementForSet(cluster, set, pl, tc.desiredInstanceCount)
if tc.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}

func podWithName(name string) *corev1.Pod {
Expand Down Expand Up @@ -1125,28 +1169,28 @@ func TestFindPodToRemove(t *testing.T) {
assert.Equal(t, pl.Instances()[0], inst)

// Can't remove from no pods.
_, _, err = controller.findPodInstanceToRemove(cluster, pl, nil)
_, _, err = controller.findPodsAndInstancesToRemove(cluster, pl, nil, 1)
assert.Equal(t, errEmptyPodList, err)

// Can't remove from malformed pod names.
_, _, err = controller.findPodInstanceToRemove(cluster, pl, []*corev1.Pod{
_, _, err = controller.findPodsAndInstancesToRemove(cluster, pl, []*corev1.Pod{
podWithName("foo"),
})
}, 1)
assert.Contains(t, err.Error(), "cannot sort pods")

// Removing from a placement w/ all pods removes the last.
pod, inst, err := controller.findPodInstanceToRemove(cluster, pl, pods)
podsToRemove, insts, err := controller.findPodsAndInstancesToRemove(cluster, pl, pods, 1)
assert.NoError(t, err)
assert.Equal(t, pods[2], pod)
assert.Equal(t, pl.Instances()[2], inst)
assert.Equal(t, pods[2], podsToRemove[0])
assert.Equal(t, pl.Instances()[2], insts[0])

// Removing from a placement w/ 2 insts and 3 pods removes the last pod that's
// still in the placement.
pl = placementFromPods(t, cluster, pods[:2], idProvider)
pod, inst, err = controller.findPodInstanceToRemove(cluster, pl, pods)
podsToRemove, insts, err = controller.findPodsAndInstancesToRemove(cluster, pl, pods, 1)
assert.NoError(t, err)
assert.Equal(t, pods[1], pod)
assert.Equal(t, pl.Instances()[1], inst)
assert.Equal(t, pods[1], podsToRemove[0])
assert.Equal(t, pl.Instances()[1], insts[0])
}

func TestEtcdFinalizer(t *testing.T) {
Expand Down