Skip to content

Commit

Permalink
*: Optimize the logic of creating sqlrunner. *: add user management r…
Browse files Browse the repository at this point in the history
…elated operations.
  • Loading branch information
runkecheng committed Sep 22, 2021
1 parent b25c9cb commit 924f5ca
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 82 deletions.
27 changes: 27 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
Expand Down Expand Up @@ -331,3 +333,28 @@ func sizeToBytes(s string) (uint64, error) {
}
return 0, fmt.Errorf("'%s' format error, must be a positive integer with a unit of measurement like K, M or G", s)
}

// IsMysqlClusterKind for the given kind checks if CRD kind is for MysqlCluster CRD.
func IsMysqlClusterKind(kind string) bool {
switch kind {
case "MysqlCluster", "mysqlcluster", "mysqlclusters":
return true
}
return false
}

// GetClusterKey returns the MysqlUser's MySQLCluster key.
func (c *Cluster) GetClusterKey() client.ObjectKey {
return client.ObjectKey{
Name: c.Name,
Namespace: c.Namespace,
}
}

// GetKey return the user key. Usually used for logging or for runtime.Client.Get as key.
func (c *Cluster) GetKey() client.ObjectKey {
return types.NamespacedName{
Namespace: c.Namespace,
Name: c.Name,
}
}
38 changes: 16 additions & 22 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ type StatefulSetSyncer struct {

// Secret resourceVersion.
sctRev string

// mysql query runner
internal.SQLRunnerFactory
}

// NewStatefulSetSyncer returns a pointer to StatefulSetSyncer.
func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev string) *StatefulSetSyncer {
func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev string, sqlRunnerFactory internal.SQLRunnerFactory) *StatefulSetSyncer {
return &StatefulSetSyncer{
Cluster: c,
cli: cli,
Expand All @@ -80,8 +83,9 @@ func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev s
Namespace: c.Namespace,
},
},
cmRev: cmRev,
sctRev: sctRev,
cmRev: cmRev,
sctRev: sctRev,
SQLRunnerFactory: sqlRunnerFactory,
}
}

Expand Down Expand Up @@ -258,6 +262,12 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
// 5. Check followerHost current role.
// 6. If followerHost is not leader, switch it to leader through xenon.
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(s.cli, s.Cluster.GetClusterKey(), utils.OperatorUser, string(utils.LeaderHost)))
if err != nil {
return err
}
defer closeConn()

// Status.Replicas indicate the number of Pod has been created.
// So sfs.Spec.Replicas is 2, May be sfs.Status.Replicas maybe are 3, 5 ,
// because it do not update the pods, so it is still the last status.
Expand All @@ -272,7 +282,6 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
defer utils.RemoveUpdateFile()
sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

// Get secrets.
Expand All @@ -286,36 +295,21 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
); err != nil {
return fmt.Errorf("failed to get the secret: %s", sctName)
}
user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}

rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
}

leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace)
leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port)
if err != nil {
log.Error(err, "failed to connect the mysql", "node", leader)
return err
}
defer leaderRunner.Close()

if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// Set leader read only.
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
if err = sqlRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
log.Error(err, "failed to set leader read only", "node", leader)
return false, err
}

// Make sure the master has sent all binlog to slave.
success, err := leaderRunner.CheckProcesslist()
success, err := sqlRunner.CheckProcesslist()
if err != nil {
return false, err
}
Expand Down
35 changes: 14 additions & 21 deletions cluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ type StatusSyncer struct {
*cluster.Cluster

cli client.Client

// mysql query runner
internal.SQLRunnerFactory
}

// NewStatusSyncer returns a pointer to StatusSyncer.
func NewStatusSyncer(c *cluster.Cluster, cli client.Client) *StatusSyncer {
func NewStatusSyncer(c *cluster.Cluster, cli client.Client, sqlRunnerFactory internal.SQLRunnerFactory) *StatusSyncer {
return &StatusSyncer{
Cluster: c,
cli: cli,
Cluster: c,
cli: cli,
SQLRunnerFactory: sqlRunnerFactory,
}
}

Expand Down Expand Up @@ -144,7 +148,6 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error {
sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

secret := &corev1.Secret{}
Expand All @@ -158,14 +161,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
log.V(1).Info("secret not found", "name", sctName)
return nil
}
user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}

rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
Expand All @@ -187,18 +183,19 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader)

isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown
runner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), host, port)
sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(s.cli, s.Cluster.GetClusterKey(), utils.OperatorUser, host))
defer closeConn()
if err != nil {
log.Error(err, "failed to connect the mysql", "node", node.Name)
node.Message = err.Error()
} else {
isLagged, isReplicating, err = runner.CheckSlaveStatusWithRetry(checkNodeStatusRetry)
isLagged, isReplicating, err = sqlRunner.CheckSlaveStatusWithRetry(checkNodeStatusRetry)
if err != nil {
log.Error(err, "failed to check slave status", "node", node.Name)
node.Message = err.Error()
}

isReadOnly, err = runner.CheckReadOnly()
isReadOnly, err = sqlRunner.CheckReadOnly()
if err != nil {
log.Error(err, "failed to check read only", "node", node.Name)
node.Message = err.Error()
Expand All @@ -208,15 +205,11 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
isLeader == corev1.ConditionTrue &&
isReadOnly != corev1.ConditionFalse {
log.V(1).Info("try to correct the leader writeable", "node", node.Name)
runner.RunQuery("SET GLOBAL read_only=off")
runner.RunQuery("SET GLOBAL super_read_only=off")
sqlRunner.RunQuery("SET GLOBAL read_only=off")
sqlRunner.RunQuery("SET GLOBAL super_read_only=off")
}
}

if runner != nil {
runner.Close()
}

// update apiv1alpha1.NodeConditionLagged.
s.updateNodeCondition(node, int(apiv1alpha1.IndexLagged), isLagged)
// update apiv1alpha1.NodeConditionReplicating.
Expand Down
15 changes: 9 additions & 6 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

mysqlv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/controllers"
"github.com/radondb/radondb-mysql-kubernetes/internal"
//+kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -79,17 +80,19 @@ func main() {
}

if err = (&controllers.ClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.cluster"),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.cluster"),
SQLRunnerFactory: internal.NewSQLRunner,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
}
if err = (&controllers.StatusReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.status"),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("controller.status"),
SQLRunnerFactory: internal.NewSQLRunner,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Status")
os.Exit(1)
Expand Down
6 changes: 5 additions & 1 deletion controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ import (
apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/cluster"
clustersyncer "github.com/radondb/radondb-mysql-kubernetes/cluster/syncer"
"github.com/radondb/radondb-mysql-kubernetes/internal"
)

// ClusterReconciler reconciles a Cluster object
type ClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

// mysql query runner
internal.SQLRunnerFactory
}

// +kubebuilder:rbac:groups=mysql.radondb.com,resources=clusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -114,7 +118,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
clustersyncer.NewHeadlessSVCSyncer(r.Client, instance),
clustersyncer.NewLeaderSVCSyncer(r.Client, instance),
clustersyncer.NewFollowerSVCSyncer(r.Client, instance),
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev),
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev, r.SQLRunnerFactory),
clustersyncer.NewPDBSyncer(r.Client, instance),
}

Expand Down
6 changes: 5 additions & 1 deletion controllers/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
"github.com/radondb/radondb-mysql-kubernetes/cluster"
clustersyncer "github.com/radondb/radondb-mysql-kubernetes/cluster/syncer"
"github.com/radondb/radondb-mysql-kubernetes/internal"
)

// reconcileTimePeriod represents the time in which a cluster should be reconciled
Expand All @@ -50,6 +51,9 @@ type StatusReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

// mysql query runner
internal.SQLRunnerFactory
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down Expand Up @@ -88,7 +92,7 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}()

statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client)
statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory)
if err := syncer.Sync(ctx, statusSyncer, r.Recorder); err != nil {
return ctrl.Result{}, err
}
Expand Down
79 changes: 79 additions & 0 deletions internal/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2021 RadonDB.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import "strings"

// Query contains a escaped query string with variables marked with a question mark (?) and a slice
// of positional arguments.
type Query struct {
escapedQuery string
args []interface{}
}

// String representation of the query.
func (q *Query) String() string {
return q.escapedQuery
}

// Args representation of the query.
func (q *Query) Args() []interface{} {
return q.args
}

// NewQuery returns a new Query object.
func NewQuery(q string, args ...interface{}) Query {
if q == "" {
panic("unexpected empty query")
}

if !strings.HasSuffix(q, ";") {
q += ";"
}

return Query{
escapedQuery: q,
args: args,
}
}

// ConcatenateQueries concatenates the provided queries into a single query.
func ConcatenateQueries(queries ...Query) Query {
args := []interface{}{}
query := ""

for _, pq := range queries {
if query != "" {
if !strings.HasSuffix(query, "\n") {
query += "\n"
}
}

query += pq.escapedQuery
args = append(args, pq.args...)
}

return NewQuery(query, args...)
}

// BuildAtomicQuery concatenates the provided queries into a single query wrapped in a BEGIN COMMIT block.
func BuildAtomicQuery(queries ...Query) Query {
queries = append([]Query{NewQuery("BEGIN")}, queries...)
queries = append(queries, NewQuery("COMMIT"))

return ConcatenateQueries(queries...)
}
Loading

0 comments on commit 924f5ca

Please sign in to comment.