Skip to content

Commit

Permalink
Merge pull request radondb#282 from runkecheng/feature_update_node_wi…
Browse files Browse the repository at this point in the history
…thout_restart

*:update replicas without restart
  • Loading branch information
andyli029 authored Nov 23, 2021
2 parents fe7f54c + f71b787 commit 11f5ab2
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 199 deletions.
10 changes: 9 additions & 1 deletion api/v1alpha1/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,18 @@ type Persistence struct {
type ClusterState string

const (
// ClusterInitState indicates whether the cluster is initializing.
// ClusterInitState indicates whether the cluster is initializing.
ClusterInitState ClusterState = "Initializing"
// ClusterUpdateState indicates whether the cluster is being updated.
ClusterUpdateState ClusterState = "Updating"
// ClusterReadyState indicates whether all containers in the pod are ready.
ClusterReadyState ClusterState = "Ready"
// ClusterCloseState indicates whether the cluster is closed.
ClusterCloseState ClusterState = "Closed"
// ClusterScaleInState indicates whether the cluster replicas is decreasing.
ClusterScaleInState ClusterState = "ScaleIn"
// ClusterScaleOutState indicates whether the cluster replicas is increasing.
ClusterScaleOutState ClusterState = "ScaleOut"
)

// ClusterConditionType defines type for cluster condition type.
Expand All @@ -271,6 +275,10 @@ const (
ConditionClose ClusterConditionType = "Closed"
// ConditionError indicates whether there is an error in the cluster.
ConditionError ClusterConditionType = "Error"
// ConditionScaleIn indicates whether the cluster replicas is decreasing.
ConditionScaleIn ClusterConditionType = "ScaleIn"
// ConditionScaleOut indicates whether the cluster replicas is increasing.
ConditionScaleOut ClusterConditionType = "ScaleOut"
)

// ClusterCondition defines type for cluster conditions.
Expand Down
41 changes: 41 additions & 0 deletions internal/xenon_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type XenonExecutor interface {
GetRootPassword() string
SetRootPassword(rootPassword string)
RaftStatus(host string) (*apiv1alpha1.RaftStatus, error)
XenonPing(host string) error
RaftTryToLeader(host string) error
ClusterAdd(host string, toAdd string) error
ClusterRemove(host string, toRemove string) error
}

func NewXenonExecutor() XenonExecutor {
Expand Down Expand Up @@ -86,3 +89,41 @@ func (executor *xenonExecutor) RaftTryToLeader(host string) error {
}
return nil
}

func (executor *xenonExecutor) XenonPing(host string) error {
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.XenonPing, nil))
if err != nil {
return err
}
_, err = executor.httpExecutor.Execute(req)
if err != nil {
return fmt.Errorf("failed to ping host[%s], err: %s", req.Req.URL, err)
}
return nil
}

func (executor *xenonExecutor) ClusterAdd(host string, toAdd string) error {
addHost := fmt.Sprintf("{\"address\": \"%s\"}", toAdd)
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterAdd, addHost))
if err != nil {
return err
}
_, err = executor.httpExecutor.Execute(req)
if err != nil {
return fmt.Errorf("failed to add host[%s] to host[%s], err: %s", addHost, req.Req.URL, err)
}
return nil
}

func (executor *xenonExecutor) ClusterRemove(host string, toRemove string) error {
removeHost := fmt.Sprintf("{\"address\": \"%s\"}", toRemove)
req, err := NewXenonHttpRequest(NewRequestConfig(host, executor.GetRootPassword(), utils.ClusterRemove, removeHost))
if err != nil {
return err
}
_, err = executor.httpExecutor.Execute(req)
if err != nil {
return fmt.Errorf("failed to remove host[%s] from host[%s], err: %s", removeHost, req.Req.URL, err)
}
return nil
}
6 changes: 0 additions & 6 deletions mysqlcluster/container/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package container

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

Expand Down Expand Up @@ -59,10 +57,6 @@ func (c *backupSidecar) getEnvVars() []corev1.EnvVar {
Name: "SERVICE_NAME",
Value: c.GetNameForResource(utils.HeadlessSVC),
},
{
Name: "REPLICAS",
Value: fmt.Sprintf("%d", *c.Spec.Replicas),
},
{
Name: "MYSQL_ROOT_PASSWORD",
Value: c.Spec.MysqlOpts.RootPassword,
Expand Down
5 changes: 0 additions & 5 deletions mysqlcluster/container/init_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package container

import (
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -79,10 +78,6 @@ func (c *initSidecar) getEnvVars() []corev1.EnvVar {
Name: "STATEFULSET_NAME",
Value: c.GetNameForResource(utils.StatefulSet),
},
{
Name: "REPLICAS",
Value: fmt.Sprintf("%d", *c.Spec.Replicas),
},
{
Name: "ADMIT_DEFEAT_HEARBEAT_COUNT",
Value: strconv.Itoa(int(*c.Spec.XenonOpts.AdmitDefeatHearbeatCount)),
Expand Down
5 changes: 0 additions & 5 deletions mysqlcluster/container/init_sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package container

import (
"fmt"
"strconv"
"testing"

Expand Down Expand Up @@ -92,10 +91,6 @@ var (
Name: "STATEFULSET_NAME",
Value: "sample-mysql",
},
{
Name: "REPLICAS",
Value: fmt.Sprintf("%d", *testInitSidecarCluster.Spec.Replicas),
},
{
Name: "ADMIT_DEFEAT_HEARBEAT_COUNT",
Value: strconv.Itoa(int(*testInitSidecarCluster.Spec.XenonOpts.AdmitDefeatHearbeatCount)),
Expand Down
13 changes: 1 addition & 12 deletions mysqlcluster/container/xenon.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,7 @@ func (c *xenon) getEnvVars() []corev1.EnvVar {

// getLifecycle get the container lifecycle.
func (c *xenon) getLifecycle() *corev1.Lifecycle {
return &corev1.Lifecycle{
PostStart: &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{"sh", "-c", "/scripts/post-start.sh"},
},
},
PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{"sh", "-c", "/scripts/pre-stop.sh"},
},
},
}
return nil
}

// getResources get the container resources.
Expand Down
16 changes: 2 additions & 14 deletions mysqlcluster/container/xenon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

var (
xenonReplicas int32 = 1
xenonReplicas int32 = 3
xenonMysqlCluster = mysqlv1alpha1.MysqlCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "sample",
Expand Down Expand Up @@ -70,19 +70,7 @@ func TestGetXenonEnvVar(t *testing.T) {
}

func TestGetXenonLifecycle(t *testing.T) {
lifecycle := &corev1.Lifecycle{
PostStart: &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{"sh", "-c", "/scripts/post-start.sh"},
},
},
PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{"sh", "-c", "/scripts/pre-stop.sh"},
},
},
}
assert.Equal(t, lifecycle, xenonCase.Lifecycle)
assert.Nil(t, xenonCase.Lifecycle)
}

func TestGetXenonResources(t *testing.T) {
Expand Down
66 changes: 61 additions & 5 deletions mysqlcluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,13 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {

s.Status.ReadyNodes = len(readyNodes)
if s.Status.ReadyNodes == int(*s.Spec.Replicas) && int(*s.Spec.Replicas) != 0 {
s.Status.State = apiv1alpha1.ClusterReadyState
clusterCondition.Type = apiv1alpha1.ConditionReady
if err := s.reconcileXenon(s.Status.ReadyNodes); err != nil {
clusterCondition.Message = fmt.Sprintf("%s", err)
clusterCondition.Type = apiv1alpha1.ConditionError
} else {
s.Status.State = apiv1alpha1.ClusterReadyState
clusterCondition.Type = apiv1alpha1.ConditionReady
}
}

if len(s.Status.Conditions) == 0 {
Expand Down Expand Up @@ -163,9 +168,13 @@ func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition {
// When the cluster is ready or closed, the number of replicas changes,
// indicating that the cluster is updating nodes.
if oldState == apiv1alpha1.ClusterReadyState || oldState == apiv1alpha1.ClusterCloseState {
if int(*s.Spec.Replicas) != s.Status.ReadyNodes {
clusterCondition.Type = apiv1alpha1.ConditionUpdate
s.Status.State = apiv1alpha1.ClusterUpdateState
if int(*s.Spec.Replicas) > s.Status.ReadyNodes {
clusterCondition.Type = apiv1alpha1.ConditionScaleOut
s.Status.State = apiv1alpha1.ClusterScaleOutState
return clusterCondition
} else if int(*s.Spec.Replicas) < s.Status.ReadyNodes {
clusterCondition.Type = apiv1alpha1.ConditionScaleIn
s.Status.State = apiv1alpha1.ClusterScaleInState
return clusterCondition
}
}
Expand Down Expand Up @@ -302,6 +311,53 @@ func (s *StatusSyncer) updateNodeRaftStatus(node *apiv1alpha1.NodeStatus) error
return nil
}

func (s *StatusSyncer) reconcileXenon(readyNodes int) error {
expectXenonNodes := s.getExpectXenonNodes(readyNodes)
for _, nodeStatus := range s.Status.Nodes {
toRemove := utils.StringDiffIn(nodeStatus.RaftStatus.Nodes, expectXenonNodes)
if err := s.removeNodesFromXenon(nodeStatus.Name, toRemove); err != nil {
return err
}
toAdd := utils.StringDiffIn(expectXenonNodes, nodeStatus.RaftStatus.Nodes)
if err := s.addNodesInXenon(nodeStatus.Name, toAdd); err != nil {
return err
}
}
return nil
}

func (s *StatusSyncer) getExpectXenonNodes(readyNodes int) []string {
expectXenonNodes := []string{}
for i := 0; i < readyNodes; i++ {
expectXenonNodes = append(expectXenonNodes, fmt.Sprintf("%s:%d", s.GetPodHostName(i), utils.XenonPort))
}
return expectXenonNodes
}

func (s *StatusSyncer) removeNodesFromXenon(host string, toRemove []string) error {
if err := s.XenonExecutor.XenonPing(host); err != nil {
return err
}
for _, removeHost := range toRemove {
if err := s.XenonExecutor.ClusterRemove(host, removeHost); err != nil {
return err
}
}
return nil
}

func (s *StatusSyncer) addNodesInXenon(host string, toAdd []string) error {
if err := s.XenonExecutor.XenonPing(host); err != nil {
return err
}
for _, addHost := range toAdd {
if err := s.XenonExecutor.ClusterAdd(host, addHost); err != nil {
return err
}
}
return nil
}

// setPodHealthy set the pod lable healthy.
func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
healthy := "no"
Expand Down
Loading

0 comments on commit 11f5ab2

Please sign in to comment.