Skip to content

Commit

Permalink
batch node removal (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
Antanukas authored Mar 10, 2022
1 parent 3fe776a commit 0194136
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 70 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,8 @@ func (c *M3DBController) handleClusterUpdate(
// If there are more pods in the placement than we want in the group,
// trigger a remove so that we can shrink the set.
if inPlacement > desired {
setLogger.Info("remove instance from placement for set")
return c.shrinkPlacementForSet(cluster, set, placement)
setLogger.Info("shrinking placement for set")
return c.shrinkPlacementForSet(cluster, set, placement, int(desired))
}

var newCount int32
Expand Down
52 changes: 36 additions & 16 deletions pkg/controller/update_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,11 +538,12 @@ func (c *M3DBController) expandPlacementForSet(
return c.addPodsToPlacement(ctx, cluster, podsToAdd)
}

// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and
// removes the last pod in the StatefulSet from the active placement, enabling
// shrinkPlacementForSet takes a StatefulSet that needs to be shrunk and removes any pods
// that are above desired instance count in the StatefulSet from the active placement, enabling
// the StatefulSet size to be decreased once the remove completes.
func (c *M3DBController) shrinkPlacementForSet(
cluster *myspec.M3DBCluster, set *appsv1.StatefulSet, pl placement.Placement,
cluster *myspec.M3DBCluster, set *appsv1.StatefulSet,
pl placement.Placement, desiredInstanceCount int,
) error {
if cluster.Spec.PreventScaleDown {
return pkgerrors.Errorf("cannot remove nodes from %s/%s, preventScaleDown is true",
Expand All @@ -556,49 +557,68 @@ func (c *M3DBController) shrinkPlacementForSet(
return err
}

_, removeInst, err := c.findPodInstanceToRemove(cluster, pl, pods)
_, removeInst, err := c.findPodsAndInstancesToRemove(cluster, pl, pods, desiredInstanceCount)
if err != nil {
c.logger.Error("error finding pod to remove", zap.Error(err))
c.logger.Error("error finding pods to remove", zap.Error(err))
return err
}

c.logger.Info("removing pod from placement", zap.String("instance", removeInst.ID()))
return c.adminClient.placementClientForCluster(cluster).Remove([]string{removeInst.ID()})
if len(removeInst) == 0 {
c.logger.Info("nothing to remove, skipping remove call")
return nil
}

removeIds := make([]string, len(removeInst))
for idx, inst := range removeInst {
removeIds[idx] = inst.ID()
}
c.logger.Info("removing instances from placement",
zap.String("instances", strings.Join(removeIds, ",")))
return c.adminClient.placementClientForCluster(cluster).Remove(removeIds)
}

// findPodInstanceToRemove returns the pod (and associated placement instace)
// findPodsAndInstancesToRemove returns pods (and associated placement instances)
// with the highest ordinal number in the stateful set AND in the placement, so
// that we remove from the placement the pod that will be deleted when the set
// that we remove from the placement pods that will be deleted when the set
// size is scaled down.
func (c *M3DBController) findPodInstanceToRemove(
func (c *M3DBController) findPodsAndInstancesToRemove(
cluster *myspec.M3DBCluster,
pl placement.Placement,
pods []*corev1.Pod,
) (*corev1.Pod, placement.Instance, error) {
desiredInstanceCount int,
) ([]*corev1.Pod, []placement.Instance, error) {
if len(pods) == 0 {
return nil, nil, errEmptyPodList
}
if desiredInstanceCount < 0 {
msg := fmt.Sprintf("desired instance count is negative: %d", desiredInstanceCount)
return nil, nil, pkgerrors.New(msg)
}

podIDs, err := sortPods(pods)
if err != nil {
return nil, nil, pkgerrors.WithMessage(err, "cannot sort pods")
}

for i := len(podIDs) - 1; i >= 0; i-- {
var (
podsToRemove []*corev1.Pod
instancesToRemove []placement.Instance
)
for i := len(podIDs) - 1; i >= desiredInstanceCount; i-- {
pod := podIDs[i].pod
inst, err := c.findPodInPlacement(cluster, pl, pod)
if pkgerrors.Cause(err) == errPodNotInPlacement {
// If the instance is already out of the placement, continue to the next
// If the pod is already out of the placement, continue to the next
// one.
continue
}
if err != nil {
return nil, nil, pkgerrors.WithMessage(err, "error finding pod in placement")
}
return pod, inst, nil
podsToRemove = append(podsToRemove, pod)
instancesToRemove = append(instancesToRemove, inst)
}

return nil, nil, errNoPodsInPlacement
return podsToRemove, instancesToRemove, nil
}

// findPodInPlacement looks up a pod in the placement. Equality is based on
Expand Down
148 changes: 96 additions & 52 deletions pkg/controller/update_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,54 +678,98 @@ func TestExpandPlacementForSet_Err(t *testing.T) {
}

func TestShrinkPlacementForSet(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)

set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)

pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
placementMock := deps.placementClient
controller := deps.newController(t)
defer deps.cleanup()

identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods, deps.idProvider)

// Expect the last pod to be removed.
placementMock.EXPECT().Remove([]string{`{"name":"cluster-zones-rep0-2","uid":"2"}`})
err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.NoError(t, err)

// If there are more pods in the set then in the placement, we expect the last
// in the set to be removed.
pl = placementFromPods(t, cluster, pods[:2], deps.idProvider)
placementMock.EXPECT().Remove([]string{`{"name":"cluster-zones-rep0-1","uid":"1"}`})
err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.NoError(t, err)
}
const instanceCountPerGroup = 3
tests := []struct {
name string
desiredInstanceCount int
placementInstanceCount int
preventScaleDown bool
expectedRemovedIds []string
expectedErr string
}{
{
name: "remove single pod",
desiredInstanceCount: instanceCountPerGroup - 1,
placementInstanceCount: instanceCountPerGroup,
expectedRemovedIds: []string{`{"name":"cluster-zones-rep0-2","uid":"2"}`},
},
{
name: "empty placement",
desiredInstanceCount: instanceCountPerGroup - 1,
placementInstanceCount: 0,
expectedRemovedIds: nil,
},
{
name: "remove multiple pods",
desiredInstanceCount: instanceCountPerGroup - 2,
placementInstanceCount: instanceCountPerGroup,
expectedRemovedIds: []string{
`{"name":"cluster-zones-rep0-2","uid":"2"}`,
`{"name":"cluster-zones-rep0-1","uid":"1"}`,
},
},
{
name: "remove last when placement contains less instances than pods",
desiredInstanceCount: instanceCountPerGroup - 2,
placementInstanceCount: instanceCountPerGroup - 1,
expectedRemovedIds: []string{`{"name":"cluster-zones-rep0-1","uid":"1"}`},
},
{
name: "remove more pods than exists",
desiredInstanceCount: -42,
placementInstanceCount: instanceCountPerGroup,
expectedErr: "desired instance count is negative: -42",
},
{
name: "remove all pods",
desiredInstanceCount: 0,
placementInstanceCount: instanceCountPerGroup,
expectedRemovedIds: []string{
`{"name":"cluster-zones-rep0-2","uid":"2"}`,
`{"name":"cluster-zones-rep0-1","uid":"1"}`,
`{"name":"cluster-zones-rep0-0","uid":"0"}`,
},
},
{
name: "prevent scale down",
preventScaleDown: true,
expectedErr: "cannot remove nodes from fake/cluster-zones, preventScaleDown is true",
expectedRemovedIds: nil,
},
}

func TestShrinkPlacementForSet_PreventScaleDown(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)
cluster.Spec.PreventScaleDown = true
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
cluster := getFixture("cluster-3-zones.yaml", t)
cluster.Spec.PreventScaleDown = tc.preventScaleDown

set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)
set, err := m3db.GenerateStatefulSet(cluster, "us-fake1-a", 3)
require.NoError(t, err)

pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
controller := deps.newController(t)
defer deps.cleanup()
pods := podsForClusterSet(cluster, set, 3)
deps := newTestDeps(t, &testOpts{
kubeObjects: objectsFromPods(pods...),
})
placementMock := deps.placementClient
controller := deps.newController(t)
defer deps.cleanup()

identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods, deps.idProvider)
identifyPods(deps.idProvider, pods, nil)
pl := placementFromPods(t, cluster, pods[:tc.placementInstanceCount], deps.idProvider)

err = controller.shrinkPlacementForSet(cluster, set, pl)
assert.EqualError(t, err, "cannot remove nodes from fake/cluster-zones, preventScaleDown is true")
if len(tc.expectedRemovedIds) > 0 {
placementMock.EXPECT().Remove(tc.expectedRemovedIds)
}
err = controller.shrinkPlacementForSet(cluster, set, pl, tc.desiredInstanceCount)
if tc.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}

func podWithName(name string) *corev1.Pod {
Expand Down Expand Up @@ -1125,28 +1169,28 @@ func TestFindPodToRemove(t *testing.T) {
assert.Equal(t, pl.Instances()[0], inst)

// Can't remove from no pods.
_, _, err = controller.findPodInstanceToRemove(cluster, pl, nil)
_, _, err = controller.findPodsAndInstancesToRemove(cluster, pl, nil, 1)
assert.Equal(t, errEmptyPodList, err)

// Can't remove from malformed pod names.
_, _, err = controller.findPodInstanceToRemove(cluster, pl, []*corev1.Pod{
_, _, err = controller.findPodsAndInstancesToRemove(cluster, pl, []*corev1.Pod{
podWithName("foo"),
})
}, 1)
assert.Contains(t, err.Error(), "cannot sort pods")

// Removing from a placement w/ all pods removes the last.
pod, inst, err := controller.findPodInstanceToRemove(cluster, pl, pods)
podsToRemove, insts, err := controller.findPodsAndInstancesToRemove(cluster, pl, pods, 1)
assert.NoError(t, err)
assert.Equal(t, pods[2], pod)
assert.Equal(t, pl.Instances()[2], inst)
assert.Equal(t, pods[2], podsToRemove[0])
assert.Equal(t, pl.Instances()[2], insts[0])

// Removing from a placement w/ 2 insts and 3 pods removes the last pod that's
// still in the placement.
pl = placementFromPods(t, cluster, pods[:2], idProvider)
pod, inst, err = controller.findPodInstanceToRemove(cluster, pl, pods)
podsToRemove, insts, err = controller.findPodsAndInstancesToRemove(cluster, pl, pods, 1)
assert.NoError(t, err)
assert.Equal(t, pods[1], pod)
assert.Equal(t, pl.Instances()[1], inst)
assert.Equal(t, pods[1], podsToRemove[0])
assert.Equal(t, pl.Instances()[1], insts[0])
}

func TestEtcdFinalizer(t *testing.T) {
Expand Down

0 comments on commit 0194136

Please sign in to comment.