Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Optimize the logic of creating sqlrunner. #229 #230

Merged
merged 1 commit into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
Expand Down Expand Up @@ -331,3 +333,19 @@ func sizeToBytes(s string) (uint64, error) {
}
return 0, fmt.Errorf("'%s' format error, must be a positive integer with a unit of measurement like K, M or G", s)
}

// GetClusterKey returns the MysqlUser's Cluster key.
func (c *Cluster) GetClusterKey() client.ObjectKey {
return client.ObjectKey{
Name: c.Name,
Namespace: c.Namespace,
}
}

// GetKey return the user key. Usually used for logging or for runtime.Client.Get as key.
func (c *Cluster) GetKey() client.ObjectKey {
return types.NamespacedName{
Namespace: c.Namespace,
Name: c.Name,
}
}
39 changes: 17 additions & 22 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ type StatefulSetSyncer struct {

// Secret resourceVersion.
sctRev string

// Mysql query runner.
internal.SQLRunnerFactory
}

// NewStatefulSetSyncer returns a pointer to StatefulSetSyncer.
func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev string) *StatefulSetSyncer {
func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev string, sqlRunnerFactory internal.SQLRunnerFactory) *StatefulSetSyncer {
return &StatefulSetSyncer{
Cluster: c,
cli: cli,
Expand All @@ -80,8 +83,9 @@ func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev s
Namespace: c.Namespace,
},
},
cmRev: cmRev,
sctRev: sctRev,
cmRev: cmRev,
sctRev: sctRev,
SQLRunnerFactory: sqlRunnerFactory,
}
}

Expand Down Expand Up @@ -258,6 +262,13 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
// 5. Check followerHost current role.
// 6. If followerHost is not leader, switch it to leader through xenon.
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
leaderRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
s.cli, s.Cluster.GetClusterKey(), utils.OperatorUser, utils.LeaderHost))
if err != nil {
return err
}
defer closeConn()

// Status.Replicas indicate the number of Pod has been created.
// So sfs.Spec.Replicas is 2, May be sfs.Status.Replicas maybe are 3, 5 ,
// because it do not update the pods, so it is still the last status.
Expand All @@ -272,7 +283,6 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
defer utils.RemoveUpdateFile()
sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

// Get secrets.
Expand All @@ -286,36 +296,21 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
); err != nil {
return fmt.Errorf("failed to get the secret: %s", sctName)
}
user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}

rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
}

leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace)
leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port)
if err != nil {
log.Error(err, "failed to connect the mysql", "node", leader)
return err
}
defer leaderRunner.Close()

if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// Set leader read only.
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
if err = leaderRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=on;")); err != nil {
log.Error(err, "failed to set leader read only", "node", leader)
return false, err
}

// Make sure the master has sent all binlog to slave.
success, err := leaderRunner.CheckProcesslist()
success, err := internal.CheckProcesslist(leaderRunner)
if err != nil {
return false, err
}
Expand Down
36 changes: 15 additions & 21 deletions cluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ type StatusSyncer struct {
*cluster.Cluster

cli client.Client

// Mysql query runner.
internal.SQLRunnerFactory
}

// NewStatusSyncer returns a pointer to StatusSyncer.
func NewStatusSyncer(c *cluster.Cluster, cli client.Client) *StatusSyncer {
func NewStatusSyncer(c *cluster.Cluster, cli client.Client, sqlRunnerFactory internal.SQLRunnerFactory) *StatusSyncer {
return &StatusSyncer{
Cluster: c,
cli: cli,
Cluster: c,
cli: cli,
SQLRunnerFactory: sqlRunnerFactory,
}
}

Expand Down Expand Up @@ -144,7 +148,6 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error {
sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

secret := &corev1.Secret{}
Expand All @@ -158,14 +161,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
log.V(1).Info("secret not found", "name", sctName)
return nil
}
user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}

rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
Expand All @@ -187,18 +183,20 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader)

isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown
runner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), host, port)
sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
s.cli, s.Cluster.GetClusterKey(), utils.OperatorUser, host))
defer closeConn()
if err != nil {
log.Error(err, "failed to connect the mysql", "node", node.Name)
node.Message = err.Error()
} else {
isLagged, isReplicating, err = runner.CheckSlaveStatusWithRetry(checkNodeStatusRetry)
isLagged, isReplicating, err = internal.CheckSlaveStatusWithRetry(sqlRunner, checkNodeStatusRetry)
if err != nil {
log.Error(err, "failed to check slave status", "node", node.Name)
node.Message = err.Error()
}

isReadOnly, err = runner.CheckReadOnly()
isReadOnly, err = internal.CheckReadOnly(sqlRunner)
if err != nil {
log.Error(err, "failed to check read only", "node", node.Name)
node.Message = err.Error()
Expand All @@ -208,15 +206,11 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
isLeader == corev1.ConditionTrue &&
isReadOnly != corev1.ConditionFalse {
log.V(1).Info("try to correct the leader writeable", "node", node.Name)
runner.RunQuery("SET GLOBAL read_only=off")
runner.RunQuery("SET GLOBAL super_read_only=off")
sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL read_only=off"))
sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=off"))
}
}

if runner != nil {
runner.Close()
}

// update apiv1alpha1.NodeConditionLagged.
s.updateNodeCondition(node, int(apiv1alpha1.IndexLagged), isLagged)
// update apiv1alpha1.NodeConditionReplicating.
Expand Down
15 changes: 9 additions & 6 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

mysqlv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/controllers"
"github.com/radondb/radondb-mysql-kubernetes/internal"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -79,17 +80,19 @@ func main() {
}

if err = (&controllers.ClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.cluster"),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.cluster"),
SQLRunnerFactory: internal.NewSQLRunner,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
}
if err = (&controllers.StatusReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.status"),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.status"),
SQLRunnerFactory: internal.NewSQLRunner,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Status")
os.Exit(1)
Expand Down
6 changes: 5 additions & 1 deletion controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ import (
apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/cluster"
clustersyncer "github.com/radondb/radondb-mysql-kubernetes/cluster/syncer"
"github.com/radondb/radondb-mysql-kubernetes/internal"
)

// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

// Mysql query runner.
internal.SQLRunnerFactory
}

// +kubebuilder:rbac:groups=mysql.radondb.com,resources=clusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -114,7 +118,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
clustersyncer.NewHeadlessSVCSyncer(r.Client, instance),
clustersyncer.NewLeaderSVCSyncer(r.Client, instance),
clustersyncer.NewFollowerSVCSyncer(r.Client, instance),
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev),
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev, r.SQLRunnerFactory),
clustersyncer.NewPDBSyncer(r.Client, instance),
}

Expand Down
6 changes: 5 additions & 1 deletion controllers/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/cluster"
clustersyncer "github.com/radondb/radondb-mysql-kubernetes/cluster/syncer"
"github.com/radondb/radondb-mysql-kubernetes/internal"
)

// reconcileTimePeriod represents the time in which a cluster should be reconciled
Expand All @@ -50,6 +51,9 @@ type StatusReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

// Mysql query runner.
internal.SQLRunnerFactory
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down Expand Up @@ -88,7 +92,7 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}()

statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client)
statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory)
if err := syncer.Sync(ctx, statusSyncer, r.Recorder); err != nil {
return ctrl.Result{}, err
}
Expand Down
55 changes: 55 additions & 0 deletions internal/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2021 RadonDB.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
"strings"
"errors"
)

// Query contains a escaped query string with variables marked with a question mark (?) and a slice
// of positional arguments.
type Query struct {
escapedQuery string
args []interface{}
}

// String representation of the query.
func (q *Query) String() string {
return q.escapedQuery
}

// Args is used in test.
func (q *Query) Args() []interface{} {
return q.args
}

// NewQuery returns a new Query object.
func NewQuery(q string, args ...interface{}) Query {
if q == "" {
internalLog.Error(errors.New("SQLError"), "sql cannot be empty")
}

if !strings.HasSuffix(q, ";") {
q += ";"
}

return Query{
escapedQuery: q,
args: args,
}
}
Loading