From 92688f249e742cef03e59f6b2719e98b4081dc1d Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 27 Feb 2020 20:01:14 +0800 Subject: [PATCH] systemd engine can create virtualization --- engine/systemd/image.go | 1 - engine/systemd/systemd.go | 46 ++++++---- engine/systemd/unit_file.go | 142 +++++++++++++++++++++++++++++++ engine/systemd/virtualization.go | 48 +++++++++-- go.mod | 1 + go.sum | 2 + 6 files changed, 216 insertions(+), 24 deletions(-) create mode 100644 engine/systemd/unit_file.go diff --git a/engine/systemd/image.go b/engine/systemd/image.go index 0479ee2ba..a980bb63d 100644 --- a/engine/systemd/image.go +++ b/engine/systemd/image.go @@ -25,7 +25,6 @@ func (s *SystemdSSH) ImagesPrune(ctx context.Context) (err error) { } func (s *SystemdSSH) ImagePull(ctx context.Context, ref string, all bool) (reader io.ReadCloser, err error) { - err = engine.NotImplementedError return } diff --git a/engine/systemd/systemd.go b/engine/systemd/systemd.go index 1e0eaf1db..84bc4b43c 100644 --- a/engine/systemd/systemd.go +++ b/engine/systemd/systemd.go @@ -3,10 +3,12 @@ package systemd import ( "bytes" "context" + "io" "net" "strconv" "strings" + "github.com/pkg/errors" "github.com/projecteru2/core/engine" enginetypes "github.com/projecteru2/core/engine/types" coretypes "github.com/projecteru2/core/types" @@ -42,6 +44,7 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint func (s *SystemdSSH) WithSession(f func(*ssh.Session) error) (err error) { session, err := s.client.NewSession() + defer session.Close() if err != nil { return } @@ -49,11 +52,11 @@ func (s *SystemdSSH) WithSession(f func(*ssh.Session) error) (err error) { } func (s *SystemdSSH) Info(ctx context.Context) (info *enginetypes.Info, err error) { - cpu, err := s.CPUInfo() + cpu, err := s.CPUInfo(ctx) if err != nil { return } - memory, err := s.MemoryInfo() + memory, err := s.MemoryInfo(ctx) if err != nil { return } @@ -64,29 +67,36 @@ func (s *SystemdSSH) Info(ctx context.Context) (info *enginetypes.Info, err erro }, nil } -func (s *SystemdSSH) CPUInfo() (cpu int, err error) { - stdout := &bytes.Buffer{} - if err = s.WithSession(func(session *ssh.Session) (err error) { - session.Stdout = stdout - return session.Run(cmdInspectCPUNumber) - }); err != nil { - return +func (s *SystemdSSH) CPUInfo(ctx context.Context) (cpu int, err error) { + stdout, stderr, err := s.runSingleCommand(ctx, cmdInspectCPUNumber, nil) + if err != nil { + return 0, errors.Wrap(err, stderr.String()) } - return strconv.Atoi(strings.TrimSpace(string(stdout.Bytes()))) + return strconv.Atoi(strings.TrimSpace(stdout.String())) } -func (s *SystemdSSH) MemoryInfo() (memoryTotalInBytes int64, err error) { - stdout := &bytes.Buffer{} - if err = s.WithSession(func(session *ssh.Session) (err error) { - session.Stdout = stdout - return session.Run(cmdInspectMemoryTotalInBytes) - }); err != nil { - return +func (s *SystemdSSH) MemoryInfo(ctx context.Context) (memoryTotalInBytes int64, err error) { + stdout, stderr, err := s.runSingleCommand(ctx, cmdInspectMemoryTotalInBytes, nil) + if err != nil { + return 0, errors.Wrap(err, stderr.String()) } - memory, err := strconv.Atoi(strings.TrimSpace(string(stdout.Bytes()))) + memory, err := strconv.Atoi(strings.TrimSpace(stdout.String())) return int64(memory), err } func (s *SystemdSSH) ResourceValidate(ctx context.Context, cpu float64, cpumap map[string]int64, memory, storage int64) (err error) { return engine.NotImplementedError } + +func (s *SystemdSSH) runSingleCommand(ctx context.Context, cmd string, stdin io.Reader) (stdout, stderr *bytes.Buffer, err error) { + // what a pathetic library that leaves context completely useless + + stdout = &bytes.Buffer{} + stderr = &bytes.Buffer{} + return stdout, stderr, s.WithSession(func(session *ssh.Session) error { + session.Stdin = stdin + session.Stdout = stdout + session.Stderr = stderr + return session.Run(cmd) + }) +} diff --git a/engine/systemd/unit_file.go b/engine/systemd/unit_file.go new file mode 100644 index 000000000..d87decc37 --- /dev/null +++ b/engine/systemd/unit_file.go @@ -0,0 +1,142 @@ +package systemd + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + + log "github.com/sirupsen/logrus" + + enginetypes "github.com/projecteru2/core/engine/types" +) + +const ( + unitTemplate = ` +[Unit] +%s + +[Service] +%s + ` +) + +type unitBuilder struct { + opts *enginetypes.VirtualizationCreateOptions + unitBuffer []string + serviceBuffer []string + err error +} + +type unitDesciption struct { + Name string + Labels map[string]string +} + +func (s *SystemdSSH) newUnitBuilder(opts *enginetypes.VirtualizationCreateOptions) *unitBuilder { + return &unitBuilder{ + opts: opts, + } +} + +func (b *unitBuilder) buildUnit() *unitBuilder { + if b.err != nil { + return b + } + + description, err := json.Marshal(unitDesciption{Name: b.opts.Name, Labels: b.opts.Labels}) + if err != nil { + b.err = err + return b + } + + b.unitBuffer = append(b.unitBuffer, []string{ + fmt.Sprintf("Description=%s", string(description)), + "After=network-online.target firewalld.service", + "Wants=network-online.target", + }...) + return b +} + +func (b *unitBuilder) buildResource(CPUAmount int) *unitBuilder { + if b.err != nil { + return b + } + + allowedCPUs := []string{} + for CPU, _ := range b.opts.CPU { + allowedCPUs = append(allowedCPUs, CPU) + } + + CPUQuotaPercentage := b.opts.Quota / float64(CPUAmount) * 100 + + b.serviceBuffer = append(b.serviceBuffer, []string{ + fmt.Sprintf("ExecStartPre=/usr/bin/cgcreate -g memory,cpuset:%s", b.opts.Name), + fmt.Sprintf("ExecStartPre=/usr/bin/cgset -r cpuset.cpus=%s %s", strings.Join(allowedCPUs, ","), b.opts.Name), + fmt.Sprintf("ExecStartPre=/usr/bin/cgset -r cpuset.mems=0 %s", b.opts.Name), + fmt.Sprintf("ExecStartPre=/usr/bin/cgset -r memory.limit_in_bytes=%d %s", b.opts.Memory, b.opts.Name), + fmt.Sprintf("CPUQuota=%.2f%%", CPUQuotaPercentage), + }...) + return b +} + +func (b *unitBuilder) buildExec() *unitBuilder { + if b.err != nil { + return b + } + + stdioType, err := b.convertToSystemdStdio(b.opts.LogType) + if err != nil { + b.err = err + return b + } + + restartPolicy, err := b.convertToSystemdRestartPolicy(b.opts.RestartPolicy) + if err != nil { + b.err = err + return b + } + + b.serviceBuffer = append(b.serviceBuffer, []string{ + fmt.Sprintf("ExecStart=/usr/bin/cgexec -g memory,cpuset:%s %s", b.opts.Name, strings.Join(b.opts.Cmd, " ")), + fmt.Sprintf("StandardOutput=%s", stdioType), + fmt.Sprintf("StandardError=%s", stdioType), + fmt.Sprintf("Restart=%s", restartPolicy), + }...) + return b +} + +func (b *unitBuilder) buffer() (*bytes.Buffer, error) { + unit := fmt.Sprintf(unitTemplate, + strings.Join(b.unitBuffer, "\n"), + strings.Join(b.serviceBuffer, "\n"), + ) + log.Debugf("%s", unit) + return bytes.NewBufferString(unit), b.err +} + +func (b *unitBuilder) convertToSystemdRestartPolicy(restart string) (policy string, err error) { + switch { + case restart == "no": + policy = "no" + case restart == "always" || restart == "": + policy = "always" + case strings.HasPrefix(restart, "on-failure"): + policy = "on-failure" + default: + err = fmt.Errorf("restart policy not supported: %s", restart) + } + return +} + +func (b *unitBuilder) convertToSystemdStdio(logType string) (stdioType string, err error) { + switch logType { + case "journald", "": + stdioType = "journal" + case "none": + stdioType = "null" + default: + err = fmt.Errorf("log type not supported: %s", logType) + } + return +} diff --git a/engine/systemd/virtualization.go b/engine/systemd/virtualization.go index 6516bcdcb..91a0a1332 100644 --- a/engine/systemd/virtualization.go +++ b/engine/systemd/virtualization.go @@ -2,20 +2,58 @@ package systemd import ( "context" + "fmt" "io" + "path/filepath" + "github.com/pkg/errors" "github.com/projecteru2/core/engine" enginetypes "github.com/projecteru2/core/engine/types" + "github.com/projecteru2/core/utils" +) + +const ( + cmdFileExist = `/usr/bin/test -f '%s'` + cmdCopyFromStdin = `/bin/cp -f /dev/stdin '%s'` + cmdMkdir = `/bin/mkdir -p %s` + + eruSystemdUnitPath = `/usr/local/lib/systemd/system/` ) func (s *SystemdSSH) VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (created *enginetypes.VirtualizationCreated, err error) { - err = engine.NotImplementedError - return + CPUAmount, err := s.CPUInfo(ctx) + if err != nil { + return + } + + buffer, err := s.newUnitBuilder(opts).buildUnit().buildResource(CPUAmount).buildExec().buffer() + basename := fmt.Sprintf("%s.service", opts.Name) + if err = s.VirtualizationCopyTo(ctx, "", filepath.Join(eruSystemdUnitPath, basename), buffer, true, true); err != nil { + return + } + return &enginetypes.VirtualizationCreated{ + ID: "SYSTEMD-" + utils.RandomString(46), + Name: opts.Name, + }, nil } -func (s *SystemdSSH) VirtualizationCopyTo(ctx context.Context, ID, path string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) (err error) { - err = engine.NotImplementedError - return +func (s *SystemdSSH) VirtualizationCopyTo(ctx context.Context, ID, path string, content io.Reader, AllowOverwriteDirWithFile, _ bool) (err error) { + // mkdir -p $(dirname $PATH) + dirname, _ := filepath.Split(path) + if _, stderr, err := s.runSingleCommand(ctx, fmt.Sprintf(cmdMkdir, dirname), nil); err != nil { + return errors.Wrap(err, stderr.String()) + } + + // test -f $PATH && exit -1 + if !AllowOverwriteDirWithFile { + if _, _, err = s.runSingleCommand(ctx, fmt.Sprintf(cmdFileExist, path), nil); err == nil { + return fmt.Errorf("[VirtualizationCopyTo] file existed: %s", path) + } + } + + // cp /dev/stdin $PATH + _, stderr, err := s.runSingleCommand(ctx, fmt.Sprintf(cmdCopyFromStdin, path), content) + return errors.Wrap(err, stderr.String()) } func (s *SystemdSSH) VirtualizationStart(ctx context.Context, ID string) (err error) { diff --git a/go.mod b/go.mod index bad5d3f6b..ed971a9de 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/opencontainers/image-spec v0.0.0-20180411145040-e562b0440392 // indirect github.com/opencontainers/runc v0.0.0-20180615140650-ad0f5255060d // indirect github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/pkg/errors v0.9.1 github.com/projecteru2/libyavirt v0.0.0-20191216061912-ff6d6a2732f3 github.com/prometheus/client_golang v0.9.3 github.com/sanity-io/litter v1.1.0 diff --git a/go.sum b/go.sum index 6fe3de3ed..08ac5dc7e 100644 --- a/go.sum +++ b/go.sum @@ -143,6 +143,8 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/projecteru2/libyavirt v0.0.0-20191216061912-ff6d6a2732f3 h1:PtWw3+4YVrsDNFFQaF+yyW7OuT+dl5nJQhp9RyaszTI=