Skip to content

Commit

Permalink
xenon: add preStop timeout (#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhl003 authored Jul 22, 2022
1 parent 348dc2a commit 5a471c8
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
# RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi
RUN go mod download

# Copy the go source
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ COPY go.sum go.sum

# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
# RUN go env -w GOPROXY=https://goproxy.cn,direct;
RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi
# go mod download
RUN go mod download

Expand Down
2 changes: 1 addition & 1 deletion build/mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go env -w GOPROXY=https://goproxy.cn,direct;
RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi
# go mod download
RUN go mod download

Expand Down
2 changes: 1 addition & 1 deletion build/xenon/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
FROM golang:1.16 as builder

ARG XENON_BRANCH=master
RUN go env -w GOPROXY=https://goproxy.cn,direct && go env -w GO111MODULE=off;
RUN RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi && go env -w GO111MODULE=off;
RUN set -ex; \
mkdir -p /go/src/github.com/radondb; \
cd /go/src/github.com/radondb; \
Expand Down
219 changes: 169 additions & 50 deletions cmd/xenon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strconv"
Expand All @@ -15,11 +16,26 @@ import (
_ "github.com/go-sql-driver/mysql"
. "github.com/radondb/radondb-mysql-kubernetes/utils"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
)

type KubeAPI struct {
Client *kubernetes.Clientset
Config *rest.Config
}

type runRemoteCommandConfig struct {
container, namespace, podName string
}

const (
leaderStopCommand = "kill -9 $(pidof mysqld)"
mysqlUser = "root"
Expand Down Expand Up @@ -115,65 +131,80 @@ func leaderStop() error {
log.Info("I am readonly, skip the leader stop")
os.Exit(0)
}
ch := make(chan error)
go func() {
defer func() {
if err := enableMyRaft(); err != nil {
log.Error(err)
}
}()

// Step 1: disable event scheduler
log.Info("Disabling event scheduler")
stmt, err := SetEventScheduler(conn, false)
if err != nil {
return fmt.Errorf("failed to disable event scheduler: %s", err.Error())
}
log.Infof("set event scheduler to false: %s", stmt)
log.Info("Raft disable")
if err := disableMyRaft(); err != nil {
log.Errorf("failed to failover: %v", err)
ch <- err
}

// Step 2: set readonly
log.Info("Setting readonly")
stmt, err = SetReadOnly(conn, true)
if err != nil {
return fmt.Errorf("failed to set readonly: %s", err.Error())
}
log.Infof("set readonly to true: %s", stmt)
// Step 1: disable event scheduler
log.Info("Disabling event scheduler")
stmt, err := SetEventScheduler(conn, false)
if err != nil {
ch <- err
}
log.Infof("set event scheduler to false: %s", stmt)

// Step 3: check long running writes
log.Info("Checking long running writes")
num, stmt, err := CheckLongRunningWrites(conn, 4)
if err != nil {
return fmt.Errorf("failed to check long running writes: %s", err.Error())
}
log.Infof("%d,long running writes: %s", num, stmt)
// Step 2: set readonly
log.Info("Setting readonly")
stmt, err = SetReadOnly(conn, true)
if err != nil {
ch <- err
}
log.Infof("set readonly to true: %s", stmt)

// TODO: Step 4: set max connections
// Step 3: check long running writes
log.Info("Checking long running writes")
num, stmt, err := CheckLongRunningWrites(conn, 4)
if err != nil {
ch <- err
}
log.Infof("%d,long running writes: %s", num, stmt)

// Step 5: kill threads
log.Info("Killing threads")
err = KillThreads(conn)
if err != nil {
return fmt.Errorf("failed to kill threads: %s", err.Error())
}
// TODO: Step 4: set max connections

// Step 6: FlushTablesWithReadLock
log.Info("Flushing tables with read lock")
stmt, err = FlushTablesWithReadLock(conn)
if err != nil {
return fmt.Errorf("failed to flush tables with read lock: %s", err.Error())
}
log.Info("flushed tables with read lock: ", stmt)
// Step 5: kill threads
log.Info("Killing threads")
err = KillThreads(conn)
if err != nil {
ch <- err
}

// Step 7: FlushBinaryLogs
log.Info("Flushing binary logs")
stmt, err = FlushBinaryLogs(conn)
if err != nil {
return fmt.Errorf("failed to flush binary logs: %s", err.Error())
}
log.Info("flushed binary logs:", stmt)
// Step 6: FlushTablesWithReadLock
log.Info("Flushing tables with read lock")
stmt, err = FlushTablesWithReadLock(conn)
if err != nil {
ch <- err
}
log.Info("flushed tables with read lock: ", stmt)

// Step 8: Failover
log.Info("Failover")
time.Sleep(2 * time.Second)
if err := DoFailOver(); err != nil {
return fmt.Errorf("failed to failover: %s", err.Error())
// Step 7: FlushBinaryLogs
log.Info("Flushing binary logs")
stmt, err = FlushBinaryLogs(conn)
if err != nil {
ch <- err
}
log.Info("flushed binary logs:", stmt)
}()
select {
case err := <-ch:
return err
case <-time.After(5 * time.Second):
log.Info("timeout")
if err := killMysqld(); err != nil {
return err
}
return nil
}

log.Info("leader stop finished")
return nil
}

func liveness() error {
Expand Down Expand Up @@ -424,7 +455,7 @@ func DoFailOver() error {
if err := disableMyRaft(); err != nil {
return err
}
return WaitForNewLeader(time.Minute * 2)
return nil
}

func enableMyRaft() error {
Expand Down Expand Up @@ -534,3 +565,91 @@ func patchPodLabel(n MySQLNode, patch string) error {
}
return nil
}

func (k *KubeAPI) Exec(namespace, pod, container string, stdin io.Reader, command []string) (string, string, error) {
var stdout, stderr bytes.Buffer

var Scheme = runtime.NewScheme()
if err := corev1.AddToScheme(Scheme); err != nil {
log.Fatalf("failed to add to scheme: %v", err)
return "", "", err
}
var ParameterCodec = runtime.NewParameterCodec(Scheme)

request := k.Client.CoreV1().RESTClient().Post().
Resource("pods").SubResource("exec").
Namespace(namespace).Name(pod).
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: command,
Stdin: stdin != nil,
Stdout: true,
Stderr: true,
}, ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(k.Config, "POST", request.URL())

if err == nil {
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: &stdout,
Stderr: &stderr,
})
}

return stdout.String(), stderr.String(), err
}

func runRemoteCommand(kubeapi *KubeAPI, cfg runRemoteCommandConfig, cmd []string) (string, string, error) {
bashCmd := []string{"bash"}
reader := strings.NewReader(strings.Join(cmd, " "))
return kubeapi.Exec(cfg.namespace, cfg.podName, cfg.container, reader, bashCmd)
}

func NewForConfig(config *rest.Config) (*KubeAPI, error) {
var api KubeAPI
var err error

api.Config = config
api.Client, err = kubernetes.NewForConfig(api.Config)

return &api, err
}

func NewConfig() (*rest.Config, error) {
// The default loading rules try to read from the files specified in the
// environment or from the home directory.
loader := clientcmd.NewDefaultClientConfigLoadingRules()

// The deferred loader tries an in-cluster config if the default loading
// rules produce no results.
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
loader, &clientcmd.ConfigOverrides{}).ClientConfig()
}

func killMysqld() error {
config, err := NewConfig()
if err != nil {
panic(err)
}
k, err := NewForConfig(config)
if err != nil {
panic(err)
}
cfg := runRemoteCommandConfig{
podName: podName,
namespace: ns,
container: "mysql",
}

killMySQLCommand := []string{leaderStopCommand}
log.Infof("killing mysql command: %s", leaderStopCommand)
var output, stderr string
output, stderr, err = runRemoteCommand(k, cfg, killMySQLCommand)
log.Info("output=[" + output + "]")
log.Info("stderr=[" + stderr + "]")
if err != nil {
log.Fatal(err)
}
return nil
}
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down Expand Up @@ -297,6 +298,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635/go.mod h1:FBS0z0QWA44HXygs7VXDUOGoN/1TV3RuWkLO04am3wc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
5 changes: 5 additions & 0 deletions mysqlcluster/syncer/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func NewRoleSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) syncer.Inter
APIGroups: []string{"batch"},
Resources: []string{"jobs"},
},
{
Verbs: []string{"create"},
APIGroups: []string{""},
Resources: []string{"pods/exec"},
},
}
return nil
})
Expand Down

0 comments on commit 5a471c8

Please sign in to comment.