Skip to content

Commit

Permalink
feat: support memberjoin action (#8395)
Browse files Browse the repository at this point in the history
  • Loading branch information
kubeJocker authored Nov 14, 2024
1 parent 7ec2c5c commit 3179905
Show file tree
Hide file tree
Showing 6 changed files with 545 additions and 26 deletions.
190 changes: 165 additions & 25 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"
"slices"
"strings"
"time"

"github.com/spf13/viper"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -451,6 +452,10 @@ func (r *componentWorkloadOps) expandVolume() error {
// horizontalScale handles workload horizontal scale
func (r *componentWorkloadOps) horizontalScale() error {
its := r.runningITS
// handle memberjoin lifecycle action
if err := r.checkAndDoMemberJoin(); err != nil {
return err
}
doScaleOut, doScaleIn := r.horizontalScaling()
if !doScaleOut && !doScaleIn {
if err := r.postScaleIn(); err != nil {
Expand Down Expand Up @@ -561,6 +566,9 @@ func (r *componentWorkloadOps) scaleOut(itsObj *workloads.InstanceSet) error {
if *itsObj.Spec.Replicas == 0 {
return nil
}

r.annotateInstanceSetForMemberJoin()

graphCli := model.NewGraphClient(r.cli)
graphCli.Noop(r.dag, r.protoITS)
d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, itsObj, r.protoITS, backupKey)
Expand Down Expand Up @@ -597,11 +605,84 @@ func (r *componentWorkloadOps) scaleOut(itsObj *workloads.InstanceSet) error {
}
}

func (r *componentWorkloadOps) annotateInstanceSetForMemberJoin() {
if r.synthesizeComp.LifecycleActions.MemberJoin == nil {
return
}

podsToMemberjoin := getPodsToMemberJoinFromAnno(r.runningITS)

for podName := range r.desiredCompPodNameSet {
if r.runningItsPodNameSet.Has(podName) {
continue
}
if podsToMemberjoin.Has(podName) {
continue
}
podsToMemberjoin.Insert(podName)
}

if podsToMemberjoin.Len() > 0 {
r.protoITS.Annotations[constant.MemberJoinStatusAnnotationKey] = strings.Join(sets.List(podsToMemberjoin), ",")
}
}

func getPodsToMemberJoinFromAnno(instanceSet *workloads.InstanceSet) sets.Set[string] {
podsToMemberjoin := sets.New[string]()
if instanceSet == nil {
return podsToMemberjoin
}

if instanceSet.Annotations == nil {
return podsToMemberjoin
}

if memberJoinStatus := instanceSet.Annotations[constant.MemberJoinStatusAnnotationKey]; memberJoinStatus != "" {
podsToMemberjoin.Insert(strings.Split(memberJoinStatus, ",")...)
}

return podsToMemberjoin
}

func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
pods, err := component.ListOwnedPods(r.reqCtx.Ctx, r.cli, r.cluster.Namespace, r.cluster.Name, r.synthesizeComp.Name)
if err != nil {
return err
}

// TODO: Move memberLeave to the ITS controller. Instead of performing a switchover, we can directly scale down the non-leader nodes. This is because the pod ordinal is not guaranteed to be continuous.
podsToMemberLeave := make([]*corev1.Pod, 0)

podsToMemberjoin := getPodsToMemberJoinFromAnno(r.runningITS)
for _, pod := range pods {
// if the pod not exists in the generated pod names, it should be a member that needs to leave
if _, ok := r.desiredCompPodNameSet[pod.Name]; ok {
continue
}
podsToMemberLeave = append(podsToMemberLeave, pod)
}

var leaveErrors []error
for _, pod := range podsToMemberLeave {

if podsToMemberjoin.Has(pod.Name) {
leaveErrors = append(leaveErrors, fmt.Errorf("pod %s is in memberjoin process", pod.Name))
continue
}

if err := r.leaveMemberForPod(pod, pods); err != nil {
leaveErrors = append(leaveErrors, err)
}

}
if len(leaveErrors) > 0 {
return newRequeueError(time.Second, fmt.Sprintf("%v", leaveErrors))
}

return nil
}

func (r *componentWorkloadOps) leaveMemberForPod(pod *corev1.Pod, pods []*corev1.Pod) error {
isLeader := func(pod *corev1.Pod) bool {
if pod == nil || len(pod.Labels) == 0 {
return false
Expand Down Expand Up @@ -635,41 +716,100 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
return err
}

// TODO: Move memberLeave to the ITS controller. Instead of performing a switchover, we can directly scale down the non-leader nodes. This is because the pod ordinal is not guaranteed to be continuous.
podsToMemberLeave := make([]*corev1.Pod, 0)
for _, pod := range pods {
// if the pod not exists in the generated pod names, it should be a member that needs to leave
if _, ok := r.desiredCompPodNameSet[pod.Name]; ok {
continue
}
podsToMemberLeave = append(podsToMemberLeave, pod)
if !(isLeader(pod) || // if the pod is leader, it needs to call switchover
(r.synthesizeComp.LifecycleActions != nil && r.synthesizeComp.LifecycleActions.MemberLeave != nil)) { // if the memberLeave action is defined, it needs to call it
return nil
}
for _, pod := range podsToMemberLeave {
if !(isLeader(pod) || // if the pod is leader, it needs to call switchover
(r.synthesizeComp.LifecycleActions != nil && r.synthesizeComp.LifecycleActions.MemberLeave != nil)) { // if the memberLeave action is defined, it needs to call it
continue

lfa, err := lifecycle.New(r.synthesizeComp, pod, pods...)
if err != nil {
return err
}

// switchover if the leaving pod is leader
if switchoverErr := tryToSwitchover(lfa, pod); switchoverErr != nil {
return switchoverErr
}

if err = lfa.MemberLeave(r.reqCtx.Ctx, r.cli, nil); err != nil {
if !errors.Is(err, lifecycle.ErrActionNotDefined) && err == nil {
return err
}
}
return nil
}

lfa, err1 := lifecycle.New(r.synthesizeComp, pod, pods...)
if err1 != nil {
if err == nil {
err = err1
}
func (r *componentWorkloadOps) checkAndDoMemberJoin() error {
// just wait for memberjoin anno to be updated
if r.protoITS.Annotations[constant.MemberJoinStatusAnnotationKey] != "" {
return nil
}

podsToMemberjoin := getPodsToMemberJoinFromAnno(r.runningITS)
if len(podsToMemberjoin) == 0 {
return nil
}

if r.synthesizeComp.LifecycleActions == nil || r.synthesizeComp.LifecycleActions.MemberJoin == nil {
podsToMemberjoin.Clear()
}
err := r.doMemberJoin(podsToMemberjoin)
if err != nil {
return err
}

if podsToMemberjoin.Len() == 0 {
// Anno will be merged later, so it should be deleted from both protoITS and runningITS
delete(r.protoITS.Annotations, constant.MemberJoinStatusAnnotationKey)
delete(r.runningITS.Annotations, constant.MemberJoinStatusAnnotationKey)
} else {
r.protoITS.Annotations[constant.MemberJoinStatusAnnotationKey] = strings.Join(sets.List(podsToMemberjoin), ",")
}
return nil
}

func (r *componentWorkloadOps) doMemberJoin(podSet sets.Set[string]) error {
if len(podSet) == 0 {
return nil
}

runningPods, err := component.ListOwnedPods(r.reqCtx.Ctx, r.cli, r.cluster.Namespace, r.cluster.Name, r.synthesizeComp.Name)
if err != nil {
return err
}

var joinErrors []error
for _, pod := range runningPods {
if !podSet.Has(pod.Name) {
continue
}

// switchover if the leaving pod is leader
if switchoverErr := tryToSwitchover(lfa, pod); switchoverErr != nil {
return switchoverErr
if err := r.joinMemberForPod(pod, runningPods); err != nil {
joinErrors = append(joinErrors, fmt.Errorf("pod %s: %w", pod.Name, err))
} else {
podSet.Delete(pod.Name)
}
}

if err2 := lfa.MemberLeave(r.reqCtx.Ctx, r.cli, nil); err2 != nil {
if !errors.Is(err2, lifecycle.ErrActionNotDefined) && err == nil {
err = err2
}
if len(joinErrors) > 0 {
return newRequeueError(time.Second, fmt.Sprintf("%v", joinErrors))
}
return nil
}

func (r *componentWorkloadOps) joinMemberForPod(pod *corev1.Pod, pods []*corev1.Pod) error {
lfa, err := lifecycle.New(r.synthesizeComp, pod, pods...)
if err != nil {
return err
}

if err = lfa.MemberJoin(r.reqCtx.Ctx, r.cli, nil); err != nil {
if !errors.Is(err, lifecycle.ErrActionNotDefined) {
return err
}
}
return err // TODO: use requeue-after

return nil
}

func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet) error {
Expand Down
Loading

0 comments on commit 3179905

Please sign in to comment.