Skip to content

Commit

Permalink
*: Support display the raft status of the node in nodes.conditions. #284
Browse files Browse the repository at this point in the history
  • Loading branch information
runkecheng committed Nov 15, 2021
1 parent 61e8a36 commit 5df7a54
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 126 deletions.
2 changes: 2 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func main() {
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.mysqlcluster"),
SQLRunnerFactory: internal.NewSQLRunner,
XenonExecutor: internal.NewXenonExecutor(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MysqlCluster")
os.Exit(1)
Expand All @@ -93,6 +94,7 @@ func main() {
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.status"),
SQLRunnerFactory: internal.NewSQLRunner,
XenonExecutor: internal.NewXenonExecutor(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Status")
os.Exit(1)
Expand Down
6 changes: 5 additions & 1 deletion controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type MysqlClusterReconciler struct {

// Mysql query runner.
internal.SQLRunnerFactory
// XenonExecutor is used to execute Xenon HTTP instructions.
internal.XenonExecutor
}

// +kubebuilder:rbac:groups=mysql.radondb.com,resources=mysqlclusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -110,6 +112,8 @@ func (r *MysqlClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
cmRev := configMapSyncer.Object().(*corev1.ConfigMap).ResourceVersion
sctRev := secretSyncer.Object().(*corev1.Secret).ResourceVersion

r.XenonExecutor.SetRootPassword(instance.Spec.MysqlOpts.RootPassword)

// run the syncers for services, pdb and statefulset
syncers := []syncer.Interface{
clustersyncer.NewRoleSyncer(r.Client, instance),
Expand All @@ -118,7 +122,7 @@ func (r *MysqlClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
clustersyncer.NewHeadlessSVCSyncer(r.Client, instance),
clustersyncer.NewLeaderSVCSyncer(r.Client, instance),
clustersyncer.NewFollowerSVCSyncer(r.Client, instance),
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev, r.SQLRunnerFactory),
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev, r.SQLRunnerFactory, r.XenonExecutor),
clustersyncer.NewPDBSyncer(r.Client, instance),
}

Expand Down
23 changes: 1 addition & 22 deletions controllers/mysqluser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (r *MysqlUserReconciler) reconcileUserInDB(ctx context.Context, mysqlUser *
}

// Remove allowed hosts for user.
toRemove := stringDiffIn(mysqlUser.Status.AllowedHosts, mysqlUser.Spec.Hosts)
toRemove := utils.StringDiffIn(mysqlUser.Status.AllowedHosts, mysqlUser.Spec.Hosts)
for _, host := range toRemove {
if err := internal.DropUser(sqlRunner, mysqlUser.Spec.User, host); err != nil {
return err
Expand All @@ -199,27 +199,6 @@ func (r *MysqlUserReconciler) reconcileUserInDB(ctx context.Context, mysqlUser *
return nil
}

func stringDiffIn(actual, desired []string) []string {
diff := []string{}
for _, aStr := range actual {
// If is not in the desired list remove it.
if _, exists := stringIn(aStr, desired); !exists {
diff = append(diff, aStr)
}
}

return diff
}

func stringIn(str string, strs []string) (int, bool) {
for i, s := range strs {
if s == str {
return i, true
}
}
return 0, false
}

func (r *MysqlUserReconciler) dropUserFromDB(ctx context.Context, mysqlUser *mysqluser.MysqlUser) error {
sqlRunner, closeConn, err := r.SQLRunnerFactory(internal.NewConfigFromClusterKey(
r.Client, mysqlUser.GetClusterKey(), utils.RootUser, utils.LeaderHost))
Expand Down
6 changes: 5 additions & 1 deletion controllers/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type StatusReconciler struct {

// Mysql query runner.
internal.SQLRunnerFactory
// XenonExecutor is used to execute Xenon HTTP instructions.
internal.XenonExecutor
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down Expand Up @@ -92,7 +94,9 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}()

statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory)
r.XenonExecutor.SetRootPassword(instance.Spec.MysqlOpts.RootPassword)

statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory, r.XenonExecutor)
if err := syncer.Sync(ctx, statusSyncer, r.Recorder); err != nil {
return ctrl.Result{}, err
}
Expand Down
60 changes: 23 additions & 37 deletions mysqlcluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ package syncer

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/iancoleman/strcase"
Expand Down Expand Up @@ -66,10 +63,12 @@ type StatefulSetSyncer struct {

// Mysql query runner.
internal.SQLRunnerFactory
// XenonExecutor is used to execute Xenon HTTP instructions.
internal.XenonExecutor
}

// NewStatefulSetSyncer returns a pointer to StatefulSetSyncer.
func NewStatefulSetSyncer(cli client.Client, c *mysqlcluster.MysqlCluster, cmRev, sctRev string, sqlRunnerFactory internal.SQLRunnerFactory) *StatefulSetSyncer {
func NewStatefulSetSyncer(cli client.Client, c *mysqlcluster.MysqlCluster, cmRev, sctRev string, sqlRunnerFactory internal.SQLRunnerFactory, xenonExecutor internal.XenonExecutor) *StatefulSetSyncer {
return &StatefulSetSyncer{
MysqlCluster: c,
cli: cli,
Expand All @@ -86,6 +85,7 @@ func NewStatefulSetSyncer(cli client.Client, c *mysqlcluster.MysqlCluster, cmRev
cmRev: cmRev,
sctRev: sctRev,
SQLRunnerFactory: sqlRunnerFactory,
XenonExecutor: xenonExecutor,
}
}

Expand Down Expand Up @@ -422,26 +422,33 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri

followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace)
if err = retry(time.Second*5, time.Second*60, func() (bool, error) {
// Check whether is leader.
status, err := checkRole(followerHost, rootPasswd)
if err != nil {
log.Error(err, "failed to check role", "pod", follower)
return false, nil
}
if status == corev1.ConditionTrue {
return true, nil
for _, node := range s.Status.Nodes {
host := node.Name
if host == followerHost && !s.isLeader(host) {
if err := s.XenonExecutor.RaftTryToLeader(followerHost); err != nil {
return false, err
}
}
}

// If not leader, try to leader.
xenonHttpRequest(followerHost, "POST", "/v1/raft/trytoleader", rootPasswd, nil)
return false, nil
return true, nil
}); err != nil {
return err
}

return nil
}

func (s *StatefulSetSyncer) isLeader(host string) bool {
raftStatus, err := s.XenonExecutor.RaftStatus(host)
if err != nil {
return false
}
if raftStatus.Role == string(utils.Leader) {
return true
}
return false
}

// mutate set the statefulset.
func (s *StatefulSetSyncer) mutate() error {
s.sfs.Spec.ServiceName = s.GetNameForResource(utils.StatefulSet)
Expand Down Expand Up @@ -656,27 +663,6 @@ func basicEventReason(objKindName string, err error) string {
return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName))
}

func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (io.ReadCloser, error) {
req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body)
if err != nil {
return nil, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
}

return resp.Body, nil
}

// check the backup is exist and running
func (s *StatefulSetSyncer) backupIsRunning(ctx context.Context) (bool, error) {
backuplist := apiv1alpha1.BackupList{}
Expand Down
84 changes: 19 additions & 65 deletions mysqlcluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@ package syncer

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"time"

"github.com/presslabs/controller-util/syncer"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
Expand All @@ -51,14 +47,17 @@ type StatusSyncer struct {

// Mysql query runner.
internal.SQLRunnerFactory
// XenonExecutor is used to execute Xenon HTTP instructions.
internal.XenonExecutor
}

// NewStatusSyncer returns a pointer to StatusSyncer.
func NewStatusSyncer(c *mysqlcluster.MysqlCluster, cli client.Client, sqlRunnerFactory internal.SQLRunnerFactory) *StatusSyncer {
func NewStatusSyncer(c *mysqlcluster.MysqlCluster, cli client.Client, sqlRunnerFactory internal.SQLRunnerFactory, xenonExecutor internal.XenonExecutor) *StatusSyncer {
return &StatusSyncer{
MysqlCluster: c,
cli: cli,
SQLRunnerFactory: sqlRunnerFactory,
XenonExecutor: xenonExecutor,
}
}

Expand Down Expand Up @@ -177,41 +176,16 @@ func (s *StatusSyncer) updateClusterStatus() apiv1alpha1.ClusterCondition {

// updateNodeStatus update the node status.
func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error {
sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
nameSpace := s.Namespace

secret := &corev1.Secret{}
if err := cli.Get(context.TODO(),
types.NamespacedName{
Namespace: nameSpace,
Name: sctName,
},
secret,
); err != nil {
log.V(1).Info("secret not found", "name", sctName)
return nil
}

rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
}

for _, pod := range pods {
podName := pod.Name
host := fmt.Sprintf("%s.%s.%s", podName, svcName, nameSpace)
host := fmt.Sprintf("%s.%s.%s", podName, s.GetNameForResource(utils.HeadlessSVC), s.Namespace)
index := s.getNodeStatusIndex(host)
node := &s.Status.Nodes[index]
node.Message = ""

isLeader, err := checkRole(host, rootPasswd)
if err != nil {
log.Error(err, "failed to check the node role", "node", node.Name)
node.Message = err.Error()
if err := s.updateNodeRaftStatus(node); err != nil {
return err
}
// update apiv1alpha1.NodeConditionLeader.
s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader)

isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown
sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
Expand All @@ -234,7 +208,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
}

if !utils.ExistUpdateFile() &&
isLeader == corev1.ConditionTrue &&
node.RaftStatus.Role == string(utils.Leader) &&
isReadOnly != corev1.ConditionFalse {
log.V(1).Info("try to correct the leader writeable", "node", node.Name)
sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL read_only=off"))
Expand Down Expand Up @@ -307,27 +281,21 @@ func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int
}
}

// checkRole used to check whether the mysql role is leader.
func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) {
body, err := xenonHttpRequest(host, "GET", "/v1/raft/status", rootPasswd, nil)
// updateNodeRaftStatus Update Node RaftStatus.
func (s *StatusSyncer) updateNodeRaftStatus(node *apiv1alpha1.NodeStatus) error {
raftStatus, err := s.XenonExecutor.RaftStatus(node.Name)
if err != nil {
return corev1.ConditionUnknown, err
}

var out map[string]interface{}
if err = unmarshalJSON(body, &out); err != nil {
return corev1.ConditionUnknown, err
}

if out["state"] == "LEADER" {
return corev1.ConditionTrue, nil
return err
}
node.RaftStatus = *raftStatus

if out["state"] == "FOLLOWER" {
return corev1.ConditionFalse, nil
isLeader := corev1.ConditionFalse
if node.RaftStatus.Role == string(utils.Leader) {
isLeader = corev1.ConditionTrue
}

return corev1.ConditionUnknown, nil
// update apiv1alpha1.NodeConditionLeader.
s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader)
return nil
}

// setPodHealthy set the pod lable healthy.
Expand All @@ -353,17 +321,3 @@ func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node
}
return nil
}

func unmarshalJSON(in io.Reader, obj interface{}) error {
body, err := ioutil.ReadAll(in)
if err != nil {
return fmt.Errorf("io read error: %s", err)
}

if err = json.Unmarshal(body, obj); err != nil {
log.V(1).Info("error unmarshal data", "body", string(body))
return err
}

return nil
}
16 changes: 16 additions & 0 deletions utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package utils

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strconv"
Expand Down Expand Up @@ -120,3 +123,16 @@ func stringIn(str string, strs []string) (int, bool) {

return 0, false
}

func UnmarshalJSON(in io.Reader, obj interface{}) error {
body, err := ioutil.ReadAll(in)
if err != nil {
return fmt.Errorf("io read error: %s", err)
}

if err = json.Unmarshal(body, obj); err != nil {
return fmt.Errorf("error unmarshal data, error: %s, body: %s", err, string(body))
}

return nil
}
Loading

0 comments on commit 5df7a54

Please sign in to comment.