Skip to content

Commit

Permalink
*: try leader by label
Browse files Browse the repository at this point in the history
  • Loading branch information
acekingke committed Sep 21, 2023
1 parent 19c80ef commit 18939ac
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 1 deletion.
12 changes: 12 additions & 0 deletions cmd/xenon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ func leaderStop() error {
ch <- err
}
log.Info("flushed binary logs:", stmt)
// Step 8: Unlock tables
stmt, err = UnlockTables(conn)
if err != nil {
ch <- err
}
log.Info("unlock tables:", stmt)
pidfile, err = getPidFile(conn)
if err != nil {
ch <- err
Expand Down Expand Up @@ -418,6 +424,12 @@ func FlushTablesWithReadLock(db *sql.DB) (string, error) {
return query, err
}

func UnlockTables(db *sql.DB) (string, error) {
query := "UNLOCK TABLES"
_, err := db.Exec(query)
return query, err
}

func FlushBinaryLogs(db *sql.DB) (string, error) {
_, err := db.Exec("FLUSH BINARY LOGS")
return "FLUSH BINARY LOGS", err
Expand Down
12 changes: 12 additions & 0 deletions internal/pod_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,15 @@ func (p *PodExecutor) CloseXenonSemiCheck(namespace, podName string) error {
}
return nil
}

func (p *PodExecutor) XenonTryLeader(namespace, podName string) error {
cmd := []string{"xenoncli", "raft", "trytoleader"}
_, stderr, err := p.Exec(namespace, podName, "xenon", cmd...)
if err != nil {
return err
}
if len(stderr) != 0 {
return fmt.Errorf("run command %s in xenon failed: %s", cmd, stderr)
}
return nil
}
81 changes: 80 additions & 1 deletion mysqlcluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,24 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {

// get ready nodes.
var readyNodes []corev1.Pod
var PodLeader, PodTryLeader *corev1.Pod
PodLeader, PodTryLeader = nil, nil
for _, pod := range list.Items {
if len(pod.ObjectMeta.Labels[utils.LableRebuild]) > 0 {
if err := s.AutoRebuild(ctx, &pod, list.Items); err != nil {
s.log.Error(err, "failed to AutoRebuild", "pod", pod.Name, "namespace", pod.Namespace)
}
continue
}
if pod.ObjectMeta.Labels != nil {
if len(pod.ObjectMeta.Labels[utils.LabelTryLeader]) != 0 {
PodTryLeader = &pod
}
if pod.ObjectMeta.Labels["role"] == string(utils.Leader) {
PodLeader = &pod
}
}

for _, cond := range pod.Status.Conditions {
switch cond.Type {
case corev1.ContainersReady:
Expand All @@ -142,7 +153,17 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
}
}
}

// try leader
if PodTryLeader != nil {
if PodLeader != nil {
if err := s.SetLeaderReadOnly(PodLeader); err != nil {
s.log.Info("set leader readonly", "error", err.Error())
}
}
if err := s.TryLeader(ctx, PodTryLeader); err != nil {
s.log.Error(err, "failed to Try leader", "pod", PodTryLeader.Name, "namespace", PodTryLeader.Namespace)
}
}
s.Status.ReadyNodes = len(readyNodes)
if s.Status.ReadyNodes == int(*s.Spec.Replicas) && int(*s.Spec.Replicas) != 0 {
if err := s.reconcileXenon(s.Status.ReadyNodes); err != nil {
Expand Down Expand Up @@ -802,3 +823,61 @@ func (s *StatusSyncer) DoRoRebuild(ctx context.Context, pod *corev1.Pod, items [
}
return nil
}

func (s *StatusSyncer) TryLeader(ctx context.Context, pod *corev1.Pod) error {

// 1. close the xenon's SemiCheck.
executor, err := internal.NewPodExecutor()
if err != nil {
return err
}

err = executor.XenonTryLeader(s.Namespace, pod.Name)
s.log.Info("the xenon's tryleader", "pod", pod.Name)
delete(pod.ObjectMeta.Labels, utils.LabelTryLeader)
if err != nil {
return err
}

if err := s.cli.Update(ctx, pod); err != nil {
return err
}
return nil
}

func (s *StatusSyncer) SetLeaderReadOnly(pod *corev1.Pod) error {
var sqlRunner internal.SQLRunner
closeCh := make(chan func())

var closeConn func()
errCh := make(chan error)
host := fmt.Sprintf("%s.%s-mysql.%s", pod.Name, s.Name, s.Namespace)
cfg, errOut := internal.NewConfigFromClusterKey(
s.cli, s.MysqlCluster.GetClusterKey(), utils.RootUser, host)
go func(sqlRunner *internal.SQLRunner, errCh chan error, closeCh chan func()) {
var err error
*sqlRunner, closeConn, err = s.SQLRunnerFactory(cfg, errOut)
if err != nil {
s.log.V(1).Info("failed to get sql runner", "error", err)
errCh <- err
return
}
if closeConn != nil {
closeCh <- closeConn
return
}
errCh <- nil
}(&sqlRunner, errCh, closeCh)

select {
case errOut = <-errCh:
return errOut
case closeConn := <-closeCh:
defer closeConn()
case <-time.After(time.Second * 5):
}
if sqlRunner != nil {
return sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=on"))
}
return nil
}
1 change: 1 addition & 0 deletions utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ const (
const LableRebuild = "rebuild"
const LabelRebuildFrom = "rebuild-from"
const LabelMaintain = "maintain"
const LabelTryLeader = "tryleader"

// XenonHttpUrl is a http url corresponding to the xenon instruction.
type XenonHttpUrl string
Expand Down

0 comments on commit 18939ac

Please sign in to comment.