From 300d5316b361f88888f015fd927052038a84fcf3 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Thu, 23 Sep 2021 11:25:19 +0800 Subject: [PATCH] *: Support user management through crd. #175 --- cluster/cluster.go | 11 +- cmd/manager/main.go | 9 + controllers/mysqluser_controller.go | 279 ++++++++++++++++++++++++++++ internal/sql_runner.go | 20 +- mysqluser/mysqluser.go | 74 ++++++++ mysqluser/status.go | 84 +++++++++ utils/common.go | 22 +++ 7 files changed, 488 insertions(+), 11 deletions(-) create mode 100644 controllers/mysqluser_controller.go create mode 100644 mysqluser/mysqluser.go create mode 100644 mysqluser/status.go diff --git a/cluster/cluster.go b/cluster/cluster.go index f42c7b9b8..d63402618 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -334,7 +334,16 @@ func sizeToBytes(s string) (uint64, error) { return 0, fmt.Errorf("'%s' format error, must be a positive integer with a unit of measurement like K, M or G", s) } -// GetClusterKey returns the MysqlUser's Cluster key. +// IsClusterKind for the given kind checks if CRD kind is for Cluster CRD. +func IsClusterKind(kind string) bool { + switch kind { + case "Cluster", "cluster", "clusters": + return true + } + return false +} + +// GetClusterKey returns the MysqlUser's MySQLCluster key. func (c *Cluster) GetClusterKey() client.ObjectKey { return client.ObjectKey{ Name: c.Name, diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 4bee7e5c7..3e93cd3b8 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -105,6 +105,15 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Backup") os.Exit(1) } + if err = (&controllers.MysqlUserReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("controller.mysqluser"), + SQLRunnerFactory: internal.NewSQLRunner, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "MysqlUser") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/controllers/mysqluser_controller.go b/controllers/mysqluser_controller.go new file mode 100644 index 000000000..e3372e5b9 --- /dev/null +++ b/controllers/mysqluser_controller.go @@ -0,0 +1,279 @@ +/* +Copyright 2021 RadonDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "reflect" + "time" + + "github.com/go-test/deep" + "github.com/presslabs/controller-util/meta" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" + mysqlcluster "github.com/radondb/radondb-mysql-kubernetes/cluster" + "github.com/radondb/radondb-mysql-kubernetes/internal" + mysqluser "github.com/radondb/radondb-mysql-kubernetes/mysqluser" + "github.com/radondb/radondb-mysql-kubernetes/utils" +) + +// MysqlUserReconciler reconciles a MysqlUser object. +type MysqlUserReconciler struct { + client.Client + Scheme *runtime.Scheme + Recorder record.EventRecorder + + // MySQL query runner. + internal.SQLRunnerFactory +} + +var ( + userLog = log.Log.WithName("controller").WithName("mysqluser") + userFinalizer = "mysqluser-finalizer" +) + +//+kubebuilder:rbac:groups=mysql.radondb.com,resources=mysqlusers,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=mysql.radondb.com,resources=mysqlusers/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=mysql.radondb.com,resources=mysqlusers/finalizers,verbs=update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// Modify the Reconcile function to compare the state specified by +// the MysqlUser object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the MysqlUser. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile +func (r *MysqlUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // your logic here. + user := mysqluser.New(&apiv1alpha1.MysqlUser{}) + + err := r.Get(ctx, req.NamespacedName, user.Unwrap()) + if err != nil { + if errors.IsNotFound(err) { + // Object not found, return. Created objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + userLog.Info("mysql user not found, maybe deleted") + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + oldStatus := user.Status.DeepCopy() + + // If mysql user has been deleted then delete it from mysql cluster. + if !user.ObjectMeta.DeletionTimestamp.IsZero() { + return ctrl.Result{}, r.removeUser(ctx, user) + } + + // Write the desired status into mysql cluster. + ruErr := r.reconcileUserInCluster(ctx, user) + if err := r.updateStatusAndErr(ctx, user, oldStatus, ruErr); err != nil { + return ctrl.Result{}, err + } + + // Enqueue the resource again after to keep the resource up to date in mysql + // in case is changed directly into mysql. + return ctrl.Result{ + Requeue: true, + RequeueAfter: 2 * time.Minute, + }, nil +} + +// removeUser deletes the corresponding user in mysql before mysql user cr is deleted. +func (r *MysqlUserReconciler) removeUser(ctx context.Context, mysqlUser *mysqluser.MysqlUser) error { + // The resource has been deleted. + if meta.HasFinalizer(&mysqlUser.ObjectMeta, userFinalizer) { + // Drop the user if the finalizer is still present. + if err := r.dropUserFromDB(ctx, mysqlUser); err != nil { + return err + } + + meta.RemoveFinalizer(&mysqlUser.ObjectMeta, userFinalizer) + + // Update resource so it will remove the finalizer. + if err := r.Update(ctx, mysqlUser.Unwrap()); err != nil { + return err + } + } + return nil +} + +// reconcileUserInCluster reconcileUserInCluster creates or updates users in mysql. +// Proceed as follows: +// 1. Create users and authorize according to the Spec. +// 2. Remove the host that does not exist in the spec from MySQL. +// 3. Make sure mysqluser has finalizer set. +// 4. Update status and condition. +func (r *MysqlUserReconciler) reconcileUserInCluster(ctx context.Context, mysqlUser *mysqluser.MysqlUser) (err error) { + // Catch the error and set the failed status. + defer setFailedStatus(&err, mysqlUser) + + // Reconcile the mysqlUser into mysql. + if err = r.reconcileUserInDB(ctx, mysqlUser); err != nil { + return + } + + // Add finalizer if is not added on the resource. + if !meta.HasFinalizer(&mysqlUser.ObjectMeta, userFinalizer) { + meta.AddFinalizer(&mysqlUser.ObjectMeta, userFinalizer) + if err = r.Update(ctx, mysqlUser.Unwrap()); err != nil { + return + } + } + + // Update status for allowedHosts if needed, mark that status need to be updated. + if !reflect.DeepEqual(mysqlUser.Status.AllowedHosts, mysqlUser.Spec.Hosts) { + mysqlUser.Status.AllowedHosts = mysqlUser.Spec.Hosts + } + + // Update the status according to the result. + mysqlUser.UpdateStatusCondition( + apiv1alpha1.MySQLUserReady, corev1.ConditionTrue, + mysqluser.ProvisionSucceededReason, "The user provisioning has succeeded.", + ) + + return +} + +// reconcileUserInDB creates and authorizes(If needed) users based on +// spec.Hosts, and then deletes users that do not exist in spec.Hosts. +func (r *MysqlUserReconciler) reconcileUserInDB(ctx context.Context, mysqlUser *mysqluser.MysqlUser) error { + sqlRunner, closeConn, err := r.SQLRunnerFactory(internal.NewConfigFromClusterKey( + r.Client, mysqlUser.GetClusterKey(), utils.RootUser, utils.LeaderHost)) + if err != nil { + return err + } + defer closeConn() + + secret := &corev1.Secret{} + secretKey := client.ObjectKey{Name: mysqlUser.Spec.SecretBinder.SecretName, Namespace: mysqlUser.Namespace} + + if err := r.Get(ctx, secretKey, secret); err != nil { + return err + } + + password := string(secret.Data[mysqlUser.Spec.SecretBinder.SecretKey]) + if password == "" { + return fmt.Errorf("the MySQL user's password must not be empty") + } + + // Create/Update user in database. + userLog.Info("creating mysql user", "key", mysqlUser.GetKey(), "username", mysqlUser.Spec.User, "cluster", mysqlUser.GetClusterKey()) + if err := internal.CreateUserIfNotExists(sqlRunner, mysqlUser.Spec.User, password, mysqlUser.Spec.Hosts, + mysqlUser.Spec.Permissions); err != nil { + return err + } + + // Remove allowed hosts for user. + toRemove := stringDiffIn(mysqlUser.Status.AllowedHosts, mysqlUser.Spec.Hosts) + for _, host := range toRemove { + if err := internal.DropUser(sqlRunner, mysqlUser.Spec.User, host); err != nil { + return err + } + } + + return nil +} + +func stringDiffIn(actual, desired []string) []string { + diff := []string{} + for _, aStr := range actual { + // If is not in the desired list remove it. + if _, exists := stringIn(aStr, desired); !exists { + diff = append(diff, aStr) + } + } + + return diff +} + +func stringIn(str string, strs []string) (int, bool) { + for i, s := range strs { + if s == str { + return i, true + } + } + return 0, false +} + +func (r *MysqlUserReconciler) dropUserFromDB(ctx context.Context, mysqlUser *mysqluser.MysqlUser) error { + sqlRunner, closeConn, err := r.SQLRunnerFactory(internal.NewConfigFromClusterKey( + r.Client, mysqlUser.GetClusterKey(), utils.RootUser, utils.LeaderHost)) + if errors.IsNotFound(err) { + // If the mysql cluster does not exists then we can safely assume that + // the user is deleted so exist successfully. + statusErr, ok := err.(*errors.StatusError) + if ok && mysqlcluster.IsClusterKind(statusErr.Status().Details.Kind) { + // It seems the cluster is not to be found, so we assume it has been deleted. + return nil + } + } + + if err != nil { + return err + } + defer closeConn() + + for _, host := range mysqlUser.Status.AllowedHosts { + userLog.Info("removing user from mysql cluster", "key", mysqlUser.GetKey(), "username", mysqlUser.Spec.User, "cluster", mysqlUser.GetClusterKey()) + if err := internal.DropUser(sqlRunner, mysqlUser.Spec.User, host); err != nil { + return err + } + } + return nil +} + +// updateStatusAndErr update the status and catch create/update error. +func (r *MysqlUserReconciler) updateStatusAndErr(ctx context.Context, mysqlUser *mysqluser.MysqlUser, oldStatus *apiv1alpha1.UserStatus, cuErr error) error { + if !reflect.DeepEqual(oldStatus, &mysqlUser.Status) { + userLog.Info("update mysql user status", "key", mysqlUser.GetKey(), "diff", deep.Equal(oldStatus, &mysqlUser.Status)) + if err := r.Status().Update(ctx, mysqlUser.Unwrap()); err != nil { + if cuErr != nil { + return fmt.Errorf("failed to update status: %s, previous error was: %s", err, cuErr) + } + return err + } + } + + return cuErr +} + +func setFailedStatus(err *error, mysqlUser *mysqluser.MysqlUser) { + if *err != nil { + mysqlUser.UpdateStatusCondition( + apiv1alpha1.MySQLUserReady, corev1.ConditionFalse, + mysqluser.ProvisionFailedReason, fmt.Sprintf("The user provisioning has failed: %s", *err), + ) + } +} + +// SetupWithManager sets up the controller with the Manager. +func (r *MysqlUserReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&apiv1alpha1.MysqlUser{}). + Complete(r) +} diff --git a/internal/sql_runner.go b/internal/sql_runner.go index d16cf585f..0e486dda1 100644 --- a/internal/sql_runner.go +++ b/internal/sql_runner.go @@ -326,27 +326,27 @@ func columnValue(scanArgs []interface{}, slaveCols []string, colName string) str } // CreateUserIfNotExists creates a user if it doesn't already exist and it gives it the specified permissions. -func (s sqlRunner) CreateUserIfNotExists( - user, pass string, allowedHosts []string, permissions []apiv1alpha1.UserPermission, +func CreateUserIfNotExists( + sqlRunner SQLRunner, user, pass string, hosts []string, permissions []apiv1alpha1.UserPermission, ) error { // Throw error if there are no allowed hosts. - if len(allowedHosts) == 0 { + if len(hosts) == 0 { return errors.New("no allowedHosts specified") } queries := []Query{ - getCreateUserQuery(user, pass, allowedHosts), - // todo: getAlterUserQuery + getCreateUserQuery(user, pass, hosts), + // todo: getAlterUserQuery. } if len(permissions) > 0 { - queries = append(queries, permissionsToQuery(permissions, user, allowedHosts)) + queries = append(queries, permissionsToQuery(permissions, user, hosts)) } query := BuildAtomicQuery(queries...) - if err := s.QueryExec(query); err != nil { + if err := sqlRunner.QueryExec(query); err != nil { return fmt.Errorf("failed to configure user (user/pass/access), err: %s", err) } @@ -379,10 +379,10 @@ func getUsersIdentification(user string, pwd *string, allowedHosts []string) (id } // DropUser removes a MySQL user if it exists, along with its privileges. -func (s sqlRunner) DropUser(user, host string) error { +func DropUser(sqlRunner SQLRunner, user, host string) error { query := NewQuery("DROP USER IF EXISTS ?@?;", user, host) - if err := s.QueryExec(query); err != nil { + if err := sqlRunner.QueryExec(query); err != nil { return fmt.Errorf("failed to delete user, err: %s", err) } @@ -422,7 +422,7 @@ func escapeID(id string) string { return id } - // don't allow using ` in id name + // don't allow using ` in id name. id = strings.ReplaceAll(id, "`", "") return fmt.Sprintf("`%s`", id) diff --git a/mysqluser/mysqluser.go b/mysqluser/mysqluser.go new file mode 100644 index 000000000..173381eff --- /dev/null +++ b/mysqluser/mysqluser.go @@ -0,0 +1,74 @@ +/* +Copyright 2021 RadonDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqluser + +import ( + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + apiv1alhpa1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" +) + +const ( + // ProvisionFailedReason is the condition reason when MysqlUser provisioning + // has failed. + ProvisionFailedReason = "ProvisionFailed" + // ProvisionInProgressReason is the reason when MysqlUser provisioning has + // started. + ProvisionInProgressReason = "ProvisionInProgress" + + // ProvisionSucceededReason the reason used when provision was successful. + ProvisionSucceededReason = "ProvisionSucceeded" +) + +// MysqlUser is a type wrapper over MysqlUser that contains the Business logic. +type MysqlUser struct { + *apiv1alhpa1.MysqlUser +} + +// New returns a wraper object over MysqlUser. +func New(mysqlUser *apiv1alhpa1.MysqlUser) *MysqlUser { + return &MysqlUser{ + MysqlUser: mysqlUser, + } +} + +// Unwrap returns the api MysqlUser object. +func (u *MysqlUser) Unwrap() *apiv1alhpa1.MysqlUser { + return u.MysqlUser +} + +// GetClusterKey returns the MysqlUser's MySQLCluster key. +func (u *MysqlUser) GetClusterKey() client.ObjectKey { + ns := u.Spec.ClusterBinder.NameSpace + if ns == "" { + ns = u.Namespace + } + + return client.ObjectKey{ + Name: u.Spec.ClusterBinder.ClusterName, + Namespace: ns, + } +} + +// GetKey return the user key. Usually used for logging or for runtime.Client.Get as key. +func (u *MysqlUser) GetKey() client.ObjectKey { + return types.NamespacedName{ + Namespace: u.Namespace, + Name: u.Name, + } +} diff --git a/mysqluser/status.go b/mysqluser/status.go new file mode 100644 index 000000000..c08dc851d --- /dev/null +++ b/mysqluser/status.go @@ -0,0 +1,84 @@ +/* +Copyright 2021 RadonDB. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqluser + +import ( + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apiv1alhpa1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1" +) + +// UpdateStatusCondition sets the condition to a status. +// for example Ready condition to True, or False. +func (u *MysqlUser) UpdateStatusCondition( + condType apiv1alhpa1.MysqlUserConditionType, + status corev1.ConditionStatus, reason, message string, +) ( + cond *apiv1alhpa1.MySQLUserCondition, changed bool, +) { + t := metav1.NewTime(time.Now()) + + existingCondition, exists := u.ConditionExists(condType) + if !exists { + newCondition := apiv1alhpa1.MySQLUserCondition{ + Type: condType, + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: t, + LastUpdateTime: t, + } + u.Status.Conditions = append(u.Status.Conditions, newCondition) + + return &newCondition, true + } + + if status != existingCondition.Status { + existingCondition.LastTransitionTime = t + changed = true + } + + if message != existingCondition.Message || reason != existingCondition.Reason { + existingCondition.LastUpdateTime = t + changed = true + } + + existingCondition.Status = status + existingCondition.Message = message + existingCondition.Reason = reason + + return existingCondition, changed +} + +// ConditionExists returns a condition and whether it exists. +func (u *MysqlUser) ConditionExists( + ct apiv1alhpa1.MysqlUserConditionType, +) ( + *apiv1alhpa1.MySQLUserCondition, bool, +) { + for i := range u.Status.Conditions { + cond := &u.Status.Conditions[i] + if cond.Type == ct { + return cond, true + } + } + + return nil, false +} diff --git a/utils/common.go b/utils/common.go index 4f48ede6c..c4b3923c0 100644 --- a/utils/common.go +++ b/utils/common.go @@ -98,3 +98,25 @@ func BuildBackupName() string { return fmt.Sprintf("backup_%v%v%v%v%v%v", cur_time.Year(), int(cur_time.Month()), cur_time.Day(), cur_time.Hour(), cur_time.Minute(), cur_time.Second()) } + +func StringDiffIn(actual, desired []string) []string { + diff := []string{} + for _, aStr := range actual { + // if is not in the desired list remove it. + if _, exists := stringIn(aStr, desired); !exists { + diff = append(diff, aStr) + } + } + + return diff +} + +func stringIn(str string, strs []string) (int, bool) { + for i, s := range strs { + if s == str { + return i, true + } + } + + return 0, false +}