From 7bf5abc5a2ce49277a2cd7c6df72bf966ffd11b7 Mon Sep 17 00:00:00 2001 From: Michael Wan Date: Mon, 7 May 2018 02:23:19 -0400 Subject: [PATCH] feature: support take over old containerd instance when pouchd restart Signed-off-by: Michael Wan --- ctrd/client.go | 206 +++++++++++++++++++++++++++++++++++----- ctrd/client_opts.go | 109 +++++++++++++++++++++ ctrd/client_test.go | 5 +- ctrd/interface.go | 1 + ctrd/wrapper_client.go | 6 +- daemon/config/config.go | 3 + daemon/daemon.go | 33 ++++++- main.go | 49 ++++------ pkg/utils/utils.go | 29 ++++++ 9 files changed, 375 insertions(+), 66 deletions(-) create mode 100644 ctrd/client_opts.go diff --git a/ctrd/client.go b/ctrd/client.go index 8aa1d5968..c14fbb39f 100644 --- a/ctrd/client.go +++ b/ctrd/client.go @@ -3,10 +3,18 @@ package ctrd import ( "context" "fmt" + "io" + "os" + "os/exec" + "path" + "strconv" + "strings" "sync" + "syscall" "time" "github.com/alibaba/pouch/pkg/scheduler" + "github.com/alibaba/pouch/pkg/utils" "github.com/containerd/containerd" "github.com/sirupsen/logrus" @@ -14,26 +22,24 @@ import ( const ( unixSocketPath = "/run/containerd/containerd.sock" + containerdPidFileName = "containerd.pid" defaultGrpcClientPoolCapacity = 5 defaultMaxStreamsClient = 100 + containerdShutdownTimeout = 15 * time.Second ) -// Config represents the config used to communicated with containerd. -type Config struct { - Address string - // GrpcClientPoolCapacity is the capacity of grpc client pool. - GrpcClientPoolCapacity int - // MaxStreamsClient records the max number of concurrent streams - MaxStreamsClient int -} - // Client is the client side the daemon holds to communicate with containerd. type Client struct { - mu sync.RWMutex - Config + mu sync.RWMutex watch *watch lock *containerLock + daemonPid int + homeDir string + rpcAddr string + oomScoreAdjust int + debugLog bool + // containerd grpc pool pool []scheduler.Factory scheduler scheduler.Scheduler @@ -42,38 +48,50 @@ type Client struct { } // NewClient connect to containerd. -func NewClient(cfg Config) (APIClient, error) { - if cfg.Address == "" { - cfg.Address = unixSocketPath - } - - if cfg.GrpcClientPoolCapacity <= 0 { - cfg.GrpcClientPoolCapacity = defaultGrpcClientPoolCapacity +func NewClient(homeDir string, opts ...ClientOpt) (APIClient, error) { + // set default value for parameters + copts := clientOpts{ + rpcAddr: unixSocketPath, + grpcClientPoolCapacity: defaultGrpcClientPoolCapacity, + maxStreamsClient: defaultMaxStreamsClient, } - if cfg.MaxStreamsClient <= 0 { - cfg.MaxStreamsClient = defaultMaxStreamsClient + for _, opt := range opts { + if err := opt(&copts); err != nil { + return nil, err + } } client := &Client{ - Config: cfg, lock: &containerLock{ ids: make(map[string]struct{}), }, watch: &watch{ containers: make(map[string]*containerPack), }, + daemonPid: -1, + homeDir: homeDir, + oomScoreAdjust: copts.oomScoreAdjust, + debugLog: copts.debugLog, + rpcAddr: copts.rpcAddr, + } + + // start new containerd instance. + if copts.startDaemon { + if err := client.runContainerdDaemon(homeDir, copts); err != nil { + return nil, err + } } - for i := 0; i < cfg.GrpcClientPoolCapacity; i++ { - cli, err := newWrapperClient(cfg) + for i := 0; i < copts.grpcClientPoolCapacity; i++ { + cli, err := newWrapperClient(copts.rpcAddr, copts.maxStreamsClient) if err != nil { return nil, fmt.Errorf("failed to create containerd client: %v", err) } client.pool = append(client.pool, cli) } - logrus.Infof("success to create %d containerd clients, connect to: %s", cfg.GrpcClientPoolCapacity, cfg.Address) + logrus.Infof("success to create %d containerd clients, connect to: %s", copts.grpcClientPoolCapacity, copts.rpcAddr) scheduler, err := scheduler.NewLRUScheduler(client.pool) if err != nil { @@ -166,3 +184,143 @@ func (c *Client) Version(ctx context.Context) (containerd.Version, error) { return cli.client.Version(ctx) } + +func (c *Client) runContainerdDaemon(homeDir string, copts clientOpts) error { + if homeDir == "" { + return fmt.Errorf("ctrd: containerd home dir should not be empty") + } + + containerdPath, err := exec.LookPath(copts.containerdBinary) + if err != nil { + return fmt.Errorf("failed to find containerd binary %s: %v", copts.containerdBinary, err) + } + + stateDir := path.Join(homeDir, "containerd/state") + if _, err := os.Stat(stateDir); err != nil && os.IsNotExist(err) { + if err := os.MkdirAll(stateDir, 0666); err != nil { + return fmt.Errorf("failed to mkdir %s: %v", stateDir, err) + } + } + + pidFileName := path.Join(stateDir, containerdPidFileName) + f, err := os.OpenFile(pidFileName, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + defer f.Close() + + buf := make([]byte, 8) + num, err := f.Read(buf) + if err != nil && err != io.EOF { + return err + } + + if num > 0 { + pid, err := strconv.ParseUint(string(buf[:num]), 10, 64) + if err != nil { + return err + } + if utils.IsProcessAlive(int(pid)) { + logrus.Infof("ctrd: previous instance of containerd still alive (%d)", pid) + c.daemonPid = int(pid) + return nil + } + } + + // empty container pid file + _, err = f.Seek(0, os.SEEK_SET) + if err != nil { + return err + } + + if err := f.Truncate(0); err != nil { + return err + } + + // if socket file exists, delete it. + if _, err := os.Stat(c.rpcAddr); err == nil { + os.RemoveAll(c.rpcAddr) + } + + cmd, err := c.newContainerdCmd(containerdPath) + if err != nil { + return err + } + + if err := utils.SetOOMScore(cmd.Process.Pid, c.oomScoreAdjust); err != nil { + utils.KillProcess(cmd.Process.Pid) + return err + } + + if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil { + utils.KillProcess(cmd.Process.Pid) + return err + } + + go cmd.Wait() + + c.daemonPid = cmd.Process.Pid + return nil +} + +func (c *Client) newContainerdCmd(containerdPath string) (*exec.Cmd, error) { + // Start a new containerd instance + args := []string{ + "-a", c.rpcAddr, + "--root", path.Join(c.homeDir, "containerd/root"), + "--state", path.Join(c.homeDir, "containerd/state"), + "-l", utils.If(c.debugLog, "debug", "info").(string), + } + + cmd := exec.Command(containerdPath, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true, Pdeathsig: syscall.SIGKILL} + cmd.Env = nil + // clear the NOTIFY_SOCKET from the env when starting containerd + for _, e := range os.Environ() { + if !strings.HasPrefix(e, "NOTIFY_SOCKET") { + cmd.Env = append(cmd.Env, e) + } + } + + if err := cmd.Start(); err != nil { + return nil, err + } + + logrus.Infof("ctrd: new containerd process, pid: %d", cmd.Process.Pid) + return cmd, nil +} + +// Cleanup handle containerd instance exits. +func (c *Client) Cleanup() error { + if c.daemonPid == -1 { + return nil + } + + if err := c.Close(); err != nil { + return err + } + + // Ask the daemon to quit + syscall.Kill(c.daemonPid, syscall.SIGTERM) + + // Wait up to 15secs for it to stop + for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second { + if !utils.IsProcessAlive(c.daemonPid) { + break + } + time.Sleep(time.Second) + } + + if utils.IsProcessAlive(c.daemonPid) { + logrus.Warnf("ctrd: containerd (%d) didn't stop within 15secs, killing it\n", c.daemonPid) + syscall.Kill(c.daemonPid, syscall.SIGKILL) + } + + // cleanup some files + os.Remove(path.Join(c.homeDir, "containerd/state", containerdPidFileName)) + os.Remove(c.rpcAddr) + + return nil +} diff --git a/ctrd/client_opts.go b/ctrd/client_opts.go new file mode 100644 index 000000000..b4b4deb6e --- /dev/null +++ b/ctrd/client_opts.go @@ -0,0 +1,109 @@ +package ctrd + +import "fmt" + +type clientOpts struct { + startDaemon bool + debugLog bool + rpcAddr string + homeDir string + containerdBinary string + grpcClientPoolCapacity int + maxStreamsClient int + oomScoreAdjust int +} + +// ClientOpt allows caller to set options for containerd client. +type ClientOpt func(c *clientOpts) error + +// WithStartDaemon set startDaemon flag for containerd client. +// startDaemon is a flag to decide whether start a new containerd instance +// when create a containerd client. +func WithStartDaemon(startDaemon bool) ClientOpt { + return func(c *clientOpts) error { + c.startDaemon = startDaemon + return nil + } +} + +// WithRPCAddr set containerd listen address. +func WithRPCAddr(rpcAddr string) ClientOpt { + return func(c *clientOpts) error { + if rpcAddr == "" { + return fmt.Errorf("rpc socket path is empty") + } + + c.rpcAddr = rpcAddr + return nil + } +} + +// WithDebugLog set debugLog flag for containerd client. +// debugLog decides containerd log level. +func WithDebugLog(debugLog bool) ClientOpt { + return func(c *clientOpts) error { + c.debugLog = debugLog + return nil + } +} + +// WithHomeDir set home dir for containerd. +func WithHomeDir(homeDir string) ClientOpt { + return func(c *clientOpts) error { + if homeDir == "" { + return fmt.Errorf("containerd home Dir is empty") + } + + c.homeDir = homeDir + return nil + } +} + +// WithContainerdBinary specifies the containerd binary path. +func WithContainerdBinary(containerdBinary string) ClientOpt { + return func(c *clientOpts) error { + if containerdBinary == "" { + return fmt.Errorf("containerd binary path is empty") + } + + c.containerdBinary = containerdBinary + return nil + } +} + +// WithGrpcClientPoolCapacity sets containerd clients pool capacity. +func WithGrpcClientPoolCapacity(grpcClientPoolCapacity int) ClientOpt { + return func(c *clientOpts) error { + if grpcClientPoolCapacity <= 0 { + return fmt.Errorf("containerd clients pool capacity should positive number") + } + + c.grpcClientPoolCapacity = grpcClientPoolCapacity + return nil + } +} + +// WithMaxStreamsClient sets one containerd grpc client can hold max streams client. +func WithMaxStreamsClient(maxStreamsClient int) ClientOpt { + return func(c *clientOpts) error { + + if maxStreamsClient <= 0 { + return fmt.Errorf("containerd max streams client should be positive number") + } + + c.maxStreamsClient = maxStreamsClient + return nil + } +} + +// WithOOMScoreAdjust sets oom-score for containerd instance. +func WithOOMScoreAdjust(oomScore int) ClientOpt { + return func(c *clientOpts) error { + if oomScore > 1000 || oomScore < -1000 { + return fmt.Errorf("oom-score range should be [-1000, 1000]") + } + + c.oomScoreAdjust = oomScore + return nil + } +} diff --git a/ctrd/client_test.go b/ctrd/client_test.go index 98f574d5b..4b2ae5ef4 100644 --- a/ctrd/client_test.go +++ b/ctrd/client_test.go @@ -7,7 +7,8 @@ import ( func TestNewClient(t *testing.T) { type args struct { - cfg Config + homeDir string + opts []ClientOpt } tests := []struct { name string @@ -19,7 +20,7 @@ func TestNewClient(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := NewClient(tt.args.cfg) + got, err := NewClient(tt.args.homeDir, tt.args.opts...) if (err != nil) != tt.wantErr { t.Errorf("NewClient() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/ctrd/interface.go b/ctrd/interface.go index 8fe687b2c..b3f5ed598 100644 --- a/ctrd/interface.go +++ b/ctrd/interface.go @@ -20,6 +20,7 @@ type APIClient interface { SnapshotAPIClient Version(ctx context.Context) (containerd.Version, error) + Cleanup() error } // ContainerAPIClient provides access to containerd container features. diff --git a/ctrd/wrapper_client.go b/ctrd/wrapper_client.go index ca618f632..88bb221fc 100644 --- a/ctrd/wrapper_client.go +++ b/ctrd/wrapper_client.go @@ -25,11 +25,11 @@ type WrapperClient struct { streamQuota int } -func newWrapperClient(cfg Config) (*WrapperClient, error) { +func newWrapperClient(rpcAddr string, maxStreamsClient int) (*WrapperClient, error) { options := []containerd.ClientOpt{ containerd.WithDefaultNamespace("default"), } - cli, err := containerd.New(cfg.Address, options...) + cli, err := containerd.New(rpcAddr, options...) if err != nil { return nil, errors.Wrap(err, "failed to connect containerd") } @@ -52,7 +52,7 @@ func newWrapperClient(cfg Config) (*WrapperClient, error) { return &WrapperClient{ client: cli, lease: &lease, - streamQuota: cfg.MaxStreamsClient, + streamQuota: maxStreamsClient, }, nil } diff --git a/daemon/config/config.go b/daemon/config/config.go index fb13785e2..bee127d40 100644 --- a/daemon/config/config.go +++ b/daemon/config/config.go @@ -105,6 +105,9 @@ type Config struct { // RegistryService RegistryService types.RegistryServiceConfig `json:"registry-service, omitempty" ` + + // oom_score_adj for the daemon + OOMScoreAdjust int `json:"oom-score-adjust,omitempty"` } // Validate validates the user input config. diff --git a/daemon/daemon.go b/daemon/daemon.go index a98f85645..7476b7cf5 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -63,9 +63,18 @@ func NewDaemon(cfg *config.Config) *Daemon { return nil } - containerd, err := ctrd.NewClient(ctrd.Config{ - Address: cfg.ContainerdAddr, - }) + // New containerd client + containerdBinaryFile := "containerd" + if cfg.ContainerdPath != "" { + containerdBinaryFile = cfg.ContainerdPath + } + containerd, err := ctrd.NewClient(cfg.HomeDir, + ctrd.WithDebugLog(cfg.Debug), + ctrd.WithStartDaemon(true), + ctrd.WithContainerdBinary(containerdBinaryFile), + ctrd.WithRPCAddr(cfg.ContainerdAddr), + ctrd.WithOOMScoreAdjust(cfg.OOMScoreAdjust), + ) if err != nil { logrus.Errorf("failed to new containerd's client: %v", err) return nil @@ -216,7 +225,23 @@ func (d *Daemon) Run() error { // Shutdown stops daemon. func (d *Daemon) Shutdown() error { - return d.server.Stop() + var errMsg string + + if err := d.server.Stop(); err != nil { + errMsg = fmt.Sprintf("%s\n", err.Error()) + } + + logrus.Debugf("Start cleanup containerd...") + + if err := d.containerd.Cleanup(); err != nil { + errMsg = fmt.Sprintf("%s\n", err.Error()) + } + + if errMsg != "" { + return fmt.Errorf("failed to shutdown pouchd: %s", errMsg) + } + + return nil } // Config gets config of daemon. diff --git a/main.go b/main.go index ad4343fa6..248437dcd 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( "fmt" "os" - osexec "os/exec" "os/signal" "path" "strings" @@ -105,6 +104,7 @@ func setupFlags(cmd *cobra.Command) { flagSet.StringSliceVar(&cfg.Labels, "label", []string{}, "Set metadata for Pouch daemon") flagSet.BoolVar(&cfg.EnableProfiler, "enable-profiler", false, "Set if pouchd setup profiler") flagSet.StringVar(&cfg.Pidfile, "pidfile", "/var/run/pouch.pid", "Save daemon pid") + flagSet.IntVar(&cfg.OOMScoreAdjust, "oom-score-adj", -500, "Set the oom_score_adj for the daemon") } // parse flags @@ -164,32 +164,12 @@ func runDaemon() error { } }() - // define and start all required processes. - if _, err := os.Stat(cfg.ContainerdAddr); err == nil { - os.RemoveAll(cfg.ContainerdAddr) - } - - containerdBinaryFile := "containerd" - if cfg.ContainerdPath != "" { - containerdBinaryFile = cfg.ContainerdPath + // set pouchd oom-score + if err := utils.SetOOMScore(os.Getpid(), cfg.OOMScoreAdjust); err != nil { + logrus.Errorf("failed to set oom-score for pouchd: %v", err) } - containerdPath, err := osexec.LookPath(containerdBinaryFile) - if err != nil { - return fmt.Errorf("failed to find containerd binary %s: %s", containerdBinaryFile, err) - } - - var processes exec.Processes = []*exec.Process{ - { - Path: containerdPath, - Args: []string{ - "-a", cfg.ContainerdAddr, - "--root", path.Join(cfg.HomeDir, "containerd/root"), - "--state", path.Join(cfg.HomeDir, "containerd/state"), - "-l", utils.If(cfg.Debug, "debug", "info").(string), - }, - }, - } + // define and start all required processes. if cfg.QuotaDriver != "" { quota.SetQuotaDriver(cfg.QuotaDriver) @@ -198,15 +178,12 @@ func runDaemon() error { if err := checkLxcfsCfg(); err != nil { return err } - defer processes.StopAll() - - if err := processes.RunAll(); err != nil { - return err - } - sigHandles = append(sigHandles, processes.StopAll) // initialize signal and handle method. - signals := make(chan os.Signal, 1) + var ( + waitExit = make(chan struct{}) + signals = make(chan os.Signal, 1) + ) signal.Notify(signals, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGHUP) go func() { sig := <-signals @@ -217,6 +194,8 @@ func runDaemon() error { logrus.Errorf("failed to handle signal: %v", err) } } + + close(waitExit) os.Exit(1) }() @@ -228,7 +207,11 @@ func runDaemon() error { sigHandles = append(sigHandles, d.ShutdownPlugin, d.Shutdown) - return d.Run() + err := d.Run() + + <-waitExit + + return err } // initLog initializes log Level and log format of daemon. diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index cdcff13a7..f218cf211 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -8,6 +8,7 @@ import ( "reflect" "strconv" "strings" + "syscall" "time" ) @@ -268,3 +269,31 @@ func NewPidfile(path string) error { } return ioutil.WriteFile(path, []byte(fmt.Sprintf("%d", os.Getpid())), 0644) } + +// IsProcessAlive returns true if process with a given pid is running. +func IsProcessAlive(pid int) bool { + err := syscall.Kill(pid, syscall.Signal(0)) + if err == nil || err == syscall.EPERM { + return true + } + + return false +} + +// KillProcess force-stops a process. +func KillProcess(pid int) { + syscall.Kill(pid, syscall.SIGKILL) +} + +// SetOOMScore sets process's oom_score value +// The higher the value of oom_score of any process, the higher is its +// likelihood of getting killed by the OOM Killer in an out-of-memory situation. +func SetOOMScore(pid, score int) error { + f, err := os.OpenFile(fmt.Sprintf("/proc/%d/oom_score_adj", pid), os.O_WRONLY, 0) + if err != nil { + return err + } + _, err = f.WriteString(strconv.Itoa(score)) + f.Close() + return err +}