Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xenon: add preStop timeout #612

Merged
merged 1 commit into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
# RUN go env -w GOPROXY=https://goproxy.cn,direct
RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi
RUN go mod download

# Copy the go source
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ COPY go.sum go.sum

# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
# RUN go env -w GOPROXY=https://goproxy.cn,direct;
RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi
# go mod download
RUN go mod download

Expand Down
2 changes: 1 addition & 1 deletion build/mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go env -w GOPROXY=https://goproxy.cn,direct;
RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi
# go mod download
RUN go mod download

Expand Down
2 changes: 1 addition & 1 deletion build/xenon/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
FROM golang:1.16 as builder

ARG XENON_BRANCH=master
RUN go env -w GOPROXY=https://goproxy.cn,direct && go env -w GO111MODULE=off;
RUN RUN if [ $(cat /etc/timezone) = "Asia/Shanghai" ] ; then go env -w GOPROXY=https://goproxy.cn,direct; fi && go env -w GO111MODULE=off;
RUN set -ex; \
mkdir -p /go/src/github.com/radondb; \
cd /go/src/github.com/radondb; \
Expand Down
219 changes: 169 additions & 50 deletions cmd/xenon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"strconv"
Expand All @@ -15,11 +16,26 @@ import (
_ "github.com/go-sql-driver/mysql"
. "github.com/radondb/radondb-mysql-kubernetes/utils"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
)

type KubeAPI struct {
Client *kubernetes.Clientset
Config *rest.Config
}

type runRemoteCommandConfig struct {
container, namespace, podName string
}

const (
leaderStopCommand = "kill -9 $(pidof mysqld)"
mysqlUser = "root"
Expand Down Expand Up @@ -115,65 +131,80 @@ func leaderStop() error {
log.Info("I am readonly, skip the leader stop")
os.Exit(0)
}
ch := make(chan error)
go func() {
defer func() {
if err := enableMyRaft(); err != nil {
log.Error(err)
}
}()

// Step 1: disable event scheduler
log.Info("Disabling event scheduler")
stmt, err := SetEventScheduler(conn, false)
if err != nil {
return fmt.Errorf("failed to disable event scheduler: %s", err.Error())
}
log.Infof("set event scheduler to false: %s", stmt)
log.Info("Raft disable")
if err := disableMyRaft(); err != nil {
log.Errorf("failed to failover: %v", err)
ch <- err
}

// Step 2: set readonly
log.Info("Setting readonly")
stmt, err = SetReadOnly(conn, true)
if err != nil {
return fmt.Errorf("failed to set readonly: %s", err.Error())
}
log.Infof("set readonly to true: %s", stmt)
// Step 1: disable event scheduler
log.Info("Disabling event scheduler")
stmt, err := SetEventScheduler(conn, false)
if err != nil {
ch <- err
}
log.Infof("set event scheduler to false: %s", stmt)

// Step 3: check long running writes
log.Info("Checking long running writes")
num, stmt, err := CheckLongRunningWrites(conn, 4)
if err != nil {
return fmt.Errorf("failed to check long running writes: %s", err.Error())
}
log.Infof("%d,long running writes: %s", num, stmt)
// Step 2: set readonly
log.Info("Setting readonly")
stmt, err = SetReadOnly(conn, true)
if err != nil {
ch <- err
}
log.Infof("set readonly to true: %s", stmt)

// TODO: Step 4: set max connections
// Step 3: check long running writes
log.Info("Checking long running writes")
num, stmt, err := CheckLongRunningWrites(conn, 4)
if err != nil {
ch <- err
}
log.Infof("%d,long running writes: %s", num, stmt)

// Step 5: kill threads
log.Info("Killing threads")
err = KillThreads(conn)
if err != nil {
return fmt.Errorf("failed to kill threads: %s", err.Error())
}
// TODO: Step 4: set max connections

// Step 6: FlushTablesWithReadLock
log.Info("Flushing tables with read lock")
stmt, err = FlushTablesWithReadLock(conn)
if err != nil {
return fmt.Errorf("failed to flush tables with read lock: %s", err.Error())
}
log.Info("flushed tables with read lock: ", stmt)
// Step 5: kill threads
log.Info("Killing threads")
err = KillThreads(conn)
if err != nil {
ch <- err
}

// Step 7: FlushBinaryLogs
log.Info("Flushing binary logs")
stmt, err = FlushBinaryLogs(conn)
if err != nil {
return fmt.Errorf("failed to flush binary logs: %s", err.Error())
}
log.Info("flushed binary logs:", stmt)
// Step 6: FlushTablesWithReadLock
log.Info("Flushing tables with read lock")
stmt, err = FlushTablesWithReadLock(conn)
if err != nil {
ch <- err
}
log.Info("flushed tables with read lock: ", stmt)

// Step 8: Failover
log.Info("Failover")
time.Sleep(2 * time.Second)
if err := DoFailOver(); err != nil {
return fmt.Errorf("failed to failover: %s", err.Error())
// Step 7: FlushBinaryLogs
log.Info("Flushing binary logs")
stmt, err = FlushBinaryLogs(conn)
if err != nil {
ch <- err
}
log.Info("flushed binary logs:", stmt)
}()
select {
case err := <-ch:
return err
case <-time.After(5 * time.Second):
log.Info("timeout")
if err := killMysqld(); err != nil {
return err
}
return nil
}

log.Info("leader stop finished")
return nil
}

func liveness() error {
Expand Down Expand Up @@ -424,7 +455,7 @@ func DoFailOver() error {
if err := disableMyRaft(); err != nil {
return err
}
return WaitForNewLeader(time.Minute * 2)
return nil
}

func enableMyRaft() error {
Expand Down Expand Up @@ -534,3 +565,91 @@ func patchPodLabel(n MySQLNode, patch string) error {
}
return nil
}

func (k *KubeAPI) Exec(namespace, pod, container string, stdin io.Reader, command []string) (string, string, error) {
var stdout, stderr bytes.Buffer

var Scheme = runtime.NewScheme()
if err := corev1.AddToScheme(Scheme); err != nil {
log.Fatalf("failed to add to scheme: %v", err)
return "", "", err
}
var ParameterCodec = runtime.NewParameterCodec(Scheme)

request := k.Client.CoreV1().RESTClient().Post().
Resource("pods").SubResource("exec").
Namespace(namespace).Name(pod).
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: command,
Stdin: stdin != nil,
Stdout: true,
Stderr: true,
}, ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(k.Config, "POST", request.URL())

if err == nil {
err = exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: &stdout,
Stderr: &stderr,
})
}

return stdout.String(), stderr.String(), err
}

func runRemoteCommand(kubeapi *KubeAPI, cfg runRemoteCommandConfig, cmd []string) (string, string, error) {
bashCmd := []string{"bash"}
reader := strings.NewReader(strings.Join(cmd, " "))
return kubeapi.Exec(cfg.namespace, cfg.podName, cfg.container, reader, bashCmd)
}

func NewForConfig(config *rest.Config) (*KubeAPI, error) {
var api KubeAPI
var err error

api.Config = config
api.Client, err = kubernetes.NewForConfig(api.Config)

return &api, err
}

func NewConfig() (*rest.Config, error) {
// The default loading rules try to read from the files specified in the
// environment or from the home directory.
loader := clientcmd.NewDefaultClientConfigLoadingRules()

// The deferred loader tries an in-cluster config if the default loading
// rules produce no results.
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
loader, &clientcmd.ConfigOverrides{}).ClientConfig()
}

func killMysqld() error {
config, err := NewConfig()
if err != nil {
panic(err)
}
k, err := NewForConfig(config)
if err != nil {
panic(err)
}
cfg := runRemoteCommandConfig{
podName: podName,
namespace: ns,
container: "mysql",
}

killMySQLCommand := []string{leaderStopCommand}
log.Infof("killing mysql command: %s", leaderStopCommand)
var output, stderr string
output, stderr, err = runRemoteCommand(k, cfg, killMySQLCommand)
log.Info("output=[" + output + "]")
log.Info("stderr=[" + stderr + "]")
if err != nil {
log.Fatal(err)
}
return nil
}
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down Expand Up @@ -297,6 +298,7 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8=
github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635/go.mod h1:FBS0z0QWA44HXygs7VXDUOGoN/1TV3RuWkLO04am3wc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
5 changes: 5 additions & 0 deletions mysqlcluster/syncer/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func NewRoleSyncer(cli client.Client, c *mysqlcluster.MysqlCluster) syncer.Inter
APIGroups: []string{"batch"},
Resources: []string{"jobs"},
},
{
Verbs: []string{"create"},
APIGroups: []string{""},
Resources: []string{"pods/exec"},
},
}
return nil
})
Expand Down