From 5a471c84316c3dda6f2a7ca125bd3debcc6e3896 Mon Sep 17 00:00:00 2001 From: hualongzhong Date: Fri, 22 Jul 2022 15:18:16 +0800 Subject: [PATCH] xenon: add preStop timeout (#612) --- Dockerfile | 2 +- Dockerfile.sidecar | 2 +- build/mysql/Dockerfile | 2 +- build/xenon/Dockerfile | 2 +- cmd/xenon/main.go | 219 ++++++++++++++++++++++++++++-------- go.sum | 2 + mysqlcluster/syncer/role.go | 5 + 7 files changed, 180 insertions(+), 54 deletions(-) diff --git a/Dockerfile b/Dockerfile index 93ebae46..daa0463a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ COPY go.mod go.mod COPY go.sum go.sum # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer -# RUN go env -w GOPROXY=https://goproxy.cn,direct +RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi RUN go mod download # Copy the go source diff --git a/Dockerfile.sidecar b/Dockerfile.sidecar index 0eb0e9c1..e51dbadb 100644 --- a/Dockerfile.sidecar +++ b/Dockerfile.sidecar @@ -11,7 +11,7 @@ COPY go.sum go.sum # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer -# RUN go env -w GOPROXY=https://goproxy.cn,direct; +RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi # go mod download RUN go mod download diff --git a/build/mysql/Dockerfile b/build/mysql/Dockerfile index e005029e..766b4c45 100644 --- a/build/mysql/Dockerfile +++ b/build/mysql/Dockerfile @@ -6,7 +6,7 @@ COPY go.mod go.mod COPY go.sum go.sum # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer -RUN go env -w GOPROXY=https://goproxy.cn,direct; +RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi # go mod download RUN go mod download diff --git a/build/xenon/Dockerfile b/build/xenon/Dockerfile index ccd49003..d41e4b98 100644 --- a/build/xenon/Dockerfile +++ b/build/xenon/Dockerfile @@ -5,7 +5,7 @@ FROM golang:1.16 as builder ARG XENON_BRANCH=master -RUN go env -w GOPROXY=https://goproxy.cn,direct && go env -w GO111MODULE=off; +RUN RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi && go env -w GO111MODULE=off; RUN set -ex; \ mkdir -p /go/src/github.com/radondb; \ cd /go/src/github.com/radondb; \ diff --git a/cmd/xenon/main.go b/cmd/xenon/main.go index 0bac9e33..09f20a71 100644 --- a/cmd/xenon/main.go +++ b/cmd/xenon/main.go @@ -6,6 +6,7 @@ import ( "database/sql" "encoding/json" "fmt" + "io" "os" "os/exec" "strconv" @@ -15,11 +16,26 @@ import ( _ "github.com/go-sql-driver/mysql" . "github.com/radondb/radondb-mysql-kubernetes/utils" log "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" ) +type KubeAPI struct { + Client *kubernetes.Clientset + Config *rest.Config +} + +type runRemoteCommandConfig struct { + container, namespace, podName string +} + const ( leaderStopCommand = "kill -9 $(pidof mysqld)" mysqlUser = "root" @@ -115,65 +131,80 @@ func leaderStop() error { log.Info("I am readonly, skip the leader stop") os.Exit(0) } + ch := make(chan error) + go func() { + defer func() { + if err := enableMyRaft(); err != nil { + log.Error(err) + } + }() - // Step 1: disable event scheduler - log.Info("Disabling event scheduler") - stmt, err := SetEventScheduler(conn, false) - if err != nil { - return fmt.Errorf("failed to disable event scheduler: %s", err.Error()) - } - log.Infof("set event scheduler to false: %s", stmt) + log.Info("Raft disable") + if err := disableMyRaft(); err != nil { + log.Errorf("failed to failover: %v", err) + ch <- err + } - // Step 2: set readonly - log.Info("Setting readonly") - stmt, err = SetReadOnly(conn, true) - if err != nil { - return fmt.Errorf("failed to set readonly: %s", err.Error()) - } - log.Infof("set readonly to true: %s", stmt) + // Step 1: disable event scheduler + log.Info("Disabling event scheduler") + stmt, err := SetEventScheduler(conn, false) + if err != nil { + ch <- err + } + log.Infof("set event scheduler to false: %s", stmt) - // Step 3: check long running writes - log.Info("Checking long running writes") - num, stmt, err := CheckLongRunningWrites(conn, 4) - if err != nil { - return fmt.Errorf("failed to check long running writes: %s", err.Error()) - } - log.Infof("%d,long running writes: %s", num, stmt) + // Step 2: set readonly + log.Info("Setting readonly") + stmt, err = SetReadOnly(conn, true) + if err != nil { + ch <- err + } + log.Infof("set readonly to true: %s", stmt) - // TODO: Step 4: set max connections + // Step 3: check long running writes + log.Info("Checking long running writes") + num, stmt, err := CheckLongRunningWrites(conn, 4) + if err != nil { + ch <- err + } + log.Infof("%d,long running writes: %s", num, stmt) - // Step 5: kill threads - log.Info("Killing threads") - err = KillThreads(conn) - if err != nil { - return fmt.Errorf("failed to kill threads: %s", err.Error()) - } + // TODO: Step 4: set max connections - // Step 6: FlushTablesWithReadLock - log.Info("Flushing tables with read lock") - stmt, err = FlushTablesWithReadLock(conn) - if err != nil { - return fmt.Errorf("failed to flush tables with read lock: %s", err.Error()) - } - log.Info("flushed tables with read lock: ", stmt) + // Step 5: kill threads + log.Info("Killing threads") + err = KillThreads(conn) + if err != nil { + ch <- err + } - // Step 7: FlushBinaryLogs - log.Info("Flushing binary logs") - stmt, err = FlushBinaryLogs(conn) - if err != nil { - return fmt.Errorf("failed to flush binary logs: %s", err.Error()) - } - log.Info("flushed binary logs:", stmt) + // Step 6: FlushTablesWithReadLock + log.Info("Flushing tables with read lock") + stmt, err = FlushTablesWithReadLock(conn) + if err != nil { + ch <- err + } + log.Info("flushed tables with read lock: ", stmt) - // Step 8: Failover - log.Info("Failover") - time.Sleep(2 * time.Second) - if err := DoFailOver(); err != nil { - return fmt.Errorf("failed to failover: %s", err.Error()) + // Step 7: FlushBinaryLogs + log.Info("Flushing binary logs") + stmt, err = FlushBinaryLogs(conn) + if err != nil { + ch <- err + } + log.Info("flushed binary logs:", stmt) + }() + select { + case err := <-ch: + return err + case <-time.After(5 * time.Second): + log.Info("timeout") + if err := killMysqld(); err != nil { + return err + } + return nil } - log.Info("leader stop finished") - return nil } func liveness() error { @@ -424,7 +455,7 @@ func DoFailOver() error { if err := disableMyRaft(); err != nil { return err } - return WaitForNewLeader(time.Minute * 2) + return nil } func enableMyRaft() error { @@ -534,3 +565,91 @@ func patchPodLabel(n MySQLNode, patch string) error { } return nil } + +func (k *KubeAPI) Exec(namespace, pod, container string, stdin io.Reader, command []string) (string, string, error) { + var stdout, stderr bytes.Buffer + + var Scheme = runtime.NewScheme() + if err := corev1.AddToScheme(Scheme); err != nil { + log.Fatalf("failed to add to scheme: %v", err) + return "", "", err + } + var ParameterCodec = runtime.NewParameterCodec(Scheme) + + request := k.Client.CoreV1().RESTClient().Post(). + Resource("pods").SubResource("exec"). + Namespace(namespace).Name(pod). + VersionedParams(&corev1.PodExecOptions{ + Container: container, + Command: command, + Stdin: stdin != nil, + Stdout: true, + Stderr: true, + }, ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(k.Config, "POST", request.URL()) + + if err == nil { + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: &stdout, + Stderr: &stderr, + }) + } + + return stdout.String(), stderr.String(), err +} + +func runRemoteCommand(kubeapi *KubeAPI, cfg runRemoteCommandConfig, cmd []string) (string, string, error) { + bashCmd := []string{"bash"} + reader := strings.NewReader(strings.Join(cmd, " ")) + return kubeapi.Exec(cfg.namespace, cfg.podName, cfg.container, reader, bashCmd) +} + +func NewForConfig(config *rest.Config) (*KubeAPI, error) { + var api KubeAPI + var err error + + api.Config = config + api.Client, err = kubernetes.NewForConfig(api.Config) + + return &api, err +} + +func NewConfig() (*rest.Config, error) { + // The default loading rules try to read from the files specified in the + // environment or from the home directory. + loader := clientcmd.NewDefaultClientConfigLoadingRules() + + // The deferred loader tries an in-cluster config if the default loading + // rules produce no results. + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + loader, &clientcmd.ConfigOverrides{}).ClientConfig() +} + +func killMysqld() error { + config, err := NewConfig() + if err != nil { + panic(err) + } + k, err := NewForConfig(config) + if err != nil { + panic(err) + } + cfg := runRemoteCommandConfig{ + podName: podName, + namespace: ns, + container: "mysql", + } + + killMySQLCommand := []string{leaderStopCommand} + log.Infof("killing mysql command: %s", leaderStopCommand) + var output, stderr string + output, stderr, err = runRemoteCommand(k, cfg, killMySQLCommand) + log.Info("output=[" + output + "]") + log.Info("stderr=[" + stderr + "]") + if err != nil { + log.Fatal(err) + } + return nil +} diff --git a/go.sum b/go.sum index 49fc5919..e79d4c7c 100644 --- a/go.sum +++ b/go.sum @@ -99,6 +99,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -297,6 +298,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/moby/term v0.0.0-20201216013528-df9cb8a40635/go.mod h1:FBS0z0QWA44HXygs7VXDUOGoN/1TV3RuWkLO04am3wc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/mysqlcluster/syncer/role.go b/mysqlcluster/syncer/role.go index 988d7da5..26e236e9 100644 --- a/mysqlcluster/syncer/role.go +++ b/mysqlcluster/syncer/role.go @@ -51,6 +51,11 @@ func NewRoleSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) syncer.Inter APIGroups: []string{"batch"}, Resources: []string{"jobs"}, }, + { + Verbs: []string{"create"}, + APIGroups: []string{""}, + Resources: []string{"pods/exec"}, + }, } return nil })