Skip to content

Commit

Permalink
engine copy with raw io.reader
Browse files Browse the repository at this point in the history
  • Loading branch information
zc authored and CMGS committed Mar 6, 2020
1 parent 669aef8 commit 9a7a381
Show file tree
Hide file tree
Showing 18 changed files with 194 additions and 235 deletions.
5 changes: 3 additions & 2 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"sync"

"github.com/projecteru2/core/types"
Expand Down Expand Up @@ -124,11 +125,11 @@ func (c *Calcium) doReplaceContainer(
if err != nil {
return nil, removeMessage, err
}
fname, err := utils.TempFile(stream)
content, err := ioutil.ReadAll(stream)
if err != nil {
return nil, removeMessage, err
}
opts.DeployOptions.Data[dst] = fname
opts.DeployOptions.Data[dst] = content
}
// 停止容器
removeMessage.Hook, err = c.doStopContainer(ctx, container, opts.IgnoreHook)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestReplaceContainer(t *testing.T) {
assert.False(t, r.Remove.Success)
}
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(ioutil.NopCloser(bytes.NewReader([]byte{})), "", nil)
opts.DeployOptions.Data = map[string]string{}
opts.DeployOptions.Data = map[string][]byte{}
// failed by Stop
engine.On("VirtualizationStop", mock.Anything, mock.Anything).Return(types.ErrCannotGetEngine).Once()
ch, err = c.ReplaceContainer(ctx, opts)
Expand Down
21 changes: 8 additions & 13 deletions cluster/calcium/send.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package calcium

import (
"bytes"
"context"
"os"
"sync"

"github.com/projecteru2/core/engine"
Expand All @@ -16,37 +16,32 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
go func() {
defer close(ch)
wg := &sync.WaitGroup{}
for dst, src := range opts.Data {
for dst, content := range opts.Data {
log.Infof("[Send] Send files to %s", dst)
wg.Add(1)
go func(dst, src string) {
go func(dst string, content []byte) {
defer wg.Done()
for _, ID := range opts.IDs {
container, err := c.GetContainer(ctx, ID)
if err != nil {
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
continue
}
if err := c.doSendFileToContainer(ctx, container.Engine, container.ID, dst, src, true, true); err != nil {
if err := c.doSendFileToContainer(ctx, container.Engine, container.ID, dst, content, true, true); err != nil {
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
continue
}
ch <- &types.SendMessage{ID: ID, Path: dst}
}
}(dst, src)
}(dst, content)
}
wg.Wait()
}()
return ch, nil
}

func (c *Calcium) doSendFileToContainer(ctx context.Context, engine engine.API, ID, dst, src string, AllowOverwriteDirWithFile bool, CopyUIDGID bool) error {
func (c *Calcium) doSendFileToContainer(ctx context.Context, engine engine.API, ID, dst string, content []byte, AllowOverwriteDirWithFile bool, CopyUIDGID bool) error {
log.Infof("[doSendFileToContainer] Send file to %s:%s", ID, dst)
log.Debugf("[doSendFileToContainer] Local file %s, remote path %s", src, dst)
f, err := os.Open(src)
if err != nil {
return err
}
defer f.Close()
return engine.VirtualizationCopyTo(ctx, ID, dst, f, AllowOverwriteDirWithFile, CopyUIDGID)
log.Debugf("[doSendFileToContainer] remote path %s", dst)
return engine.VirtualizationCopyTo(ctx, ID, dst, bytes.NewBuffer(content), AllowOverwriteDirWithFile, CopyUIDGID)
}
13 changes: 4 additions & 9 deletions cluster/calcium/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func TestSend(t *testing.T) {
defer tmpfile.Close()
opts := &types.SendOptions{
IDs: []string{"cid"},
Data: map[string]string{
"/tmp/1": "nofile",
Data: map[string][]byte{
"/tmp/1": []byte{},
},
}
store := &storemocks.Store{}
Expand All @@ -35,18 +35,13 @@ func TestSend(t *testing.T) {
for r := range ch {
assert.Error(t, r.Error)
}
// failed by no file
engine := &enginemocks.API{}
store.On("GetContainer", mock.Anything, mock.Anything).Return(
&types.Container{Engine: engine}, nil,
)
ch, err = c.Send(ctx, opts)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed by engine
opts.Data["/tmp/1"] = tmpfile.Name()
content, _ := ioutil.ReadAll(tmpfile)
opts.Data["/tmp/1"] = content
engine.On("VirtualizationCopyTo",
mock.Anything, mock.Anything, mock.Anything,
mock.Anything, mock.Anything, mock.Anything,
Expand Down
11 changes: 9 additions & 2 deletions engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/docker/go-connections/nat"
Expand Down Expand Up @@ -192,8 +193,14 @@ func (e *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.Vir
}

// VirtualizationCopyTo copy things to virtualization
func (e *Engine) VirtualizationCopyTo(ctx context.Context, ID, path string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) error {
return e.client.CopyToContainer(ctx, ID, filepath.Dir(path), content, dockertypes.CopyToContainerOptions{AllowOverwriteDirWithFile: AllowOverwriteDirWithFile, CopyUIDGID: CopyUIDGID})
func (e *Engine) VirtualizationCopyTo(ctx context.Context, ID, target string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) error {
return withTarfileDump(target, content, func(target, tarfile string) error {
content, err := os.Open(tarfile)
if err != nil {
return err
}
return e.client.CopyToContainer(ctx, ID, filepath.Dir(target), content, dockertypes.CopyToContainerOptions{AllowOverwriteDirWithFile: AllowOverwriteDirWithFile, CopyUIDGID: CopyUIDGID})
})
}

// VirtualizationStart start virtualization
Expand Down
23 changes: 23 additions & 0 deletions engine/docker/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package docker
import (
"bytes"
"io/ioutil"
"os"
"strings"
"testing"

coreutils "github.com/projecteru2/core/utils"
Expand All @@ -17,3 +19,24 @@ func TestCreateTarStream(t *testing.T) {
_, err = CreateTarStream(fname)
assert.NoError(t, err)
}

func TestWithDumpFiles(t *testing.T) {
data := map[string][]byte{
"/tmp/test-1": []byte("1"),
"/tmp/test-2": []byte("2"),
}
fp := []string{}
for target, content := range data {
withTarfileDump(target, bytes.NewBuffer(content), func(target, tarfile string) error {
assert.True(t, strings.HasPrefix(target, "/tmp/test"))
fp = append(fp, tarfile)
_, err := os.Stat(tarfile)
assert.Nil(t, err)
return nil
})
}
for _, path := range fp {
_, err := os.Stat(path)
assert.True(t, os.IsNotExist(err))
}
}
53 changes: 53 additions & 0 deletions engine/docker/tarfile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package docker

import (
"archive/tar"
"io"
"io/ioutil"
"os"
"path/filepath"

log "github.com/sirupsen/logrus"
)

func withTarfileDump(target string, content io.Reader, f func(target, tarfile string) error) error {
bytes, err := ioutil.ReadAll(content)
if err != nil {
return err
}
tarfile, err := tempTarFile(target, bytes)

defer func(tarfile string) {
if err := os.RemoveAll(tarfile); err != nil {
log.Warnf("[cleanDumpFiles] clean dump files failed: %v", err)
}
}(tarfile)

if err != nil {
return err
}
return f(target, tarfile)
}

func tempTarFile(path string, data []byte) (string, error) {
filename := filepath.Base(path)
f, err := ioutil.TempFile(os.TempDir(), filename)
if err != nil {
return "", err
}
name := f.Name()
defer f.Close()

tw := tar.NewWriter(f)
defer tw.Close()
hdr := &tar.Header{
Name: filename,
Mode: 0755,
Size: int64(len(data)),
}
if err := tw.WriteHeader(hdr); err != nil {
return name, err
}
_, err = tw.Write(data)
return name, err
}
2 changes: 1 addition & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type API interface {
BuildContent(ctx context.Context, scm coresource.Source, opts *enginetypes.BuildOptions) (string, io.Reader, error)

VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (*enginetypes.VirtualizationCreated, error)
VirtualizationCopyTo(ctx context.Context, ID, path string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) error
VirtualizationCopyTo(ctx context.Context, ID, target string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) error
VirtualizationStart(ctx context.Context, ID string) error
VirtualizationStop(ctx context.Context, ID string) error
VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error
Expand Down
8 changes: 4 additions & 4 deletions engine/mocks/API.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion engine/systemd/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,13 @@ func (b *unitBuilder) buildExec() *unitBuilder {
return b
}

cmds := []string{}
for _, cmd := range b.opts.Cmd {
cmds = append(cmds, fmt.Sprintf("'%s'", cmd))
}

b.serviceBuffer = append(b.serviceBuffer, []string{
fmt.Sprintf("ExecStart=/usr/bin/cgexec -g memory,cpuset:%s %s", b.cgroupPath(), strings.Join(b.opts.Cmd, " ")),
fmt.Sprintf("ExecStart=/usr/bin/cgexec -g memory,cpuset:%s %s", b.cgroupPath(), strings.Join(cmds, " ")),
fmt.Sprintf("User=%s", user),
fmt.Sprintf("Environment=%s", strings.Join(env, " ")),
fmt.Sprintf("StandardOutput=%s", stdioType),
Expand Down
10 changes: 5 additions & 5 deletions engine/systemd/virtualization.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ func (s *SystemdSSH) VirtualizationCreate(ctx context.Context, opts *enginetypes
}, errors.Wrap(err, stderr.String())
}

func (s *SystemdSSH) VirtualizationCopyTo(ctx context.Context, ID, path string, content io.Reader, AllowOverwriteDirWithFile, _ bool) (err error) {
func (s *SystemdSSH) VirtualizationCopyTo(ctx context.Context, ID, target string, content io.Reader, AllowOverwriteDirWithFile, _ bool) (err error) {
// mkdir -p $(dirname $PATH)
dirname, _ := filepath.Split(path)
dirname, _ := filepath.Split(target)
if _, stderr, err := s.runSingleCommand(ctx, fmt.Sprintf(cmdMkdir, dirname), nil); err != nil {
return errors.Wrap(err, stderr.String())
}

// test -f $PATH && exit -1
if !AllowOverwriteDirWithFile {
if _, _, err = s.runSingleCommand(ctx, fmt.Sprintf(cmdFileExist, path), nil); err == nil {
return fmt.Errorf("[VirtualizationCopyTo] file existed: %s", path)
if _, _, err = s.runSingleCommand(ctx, fmt.Sprintf(cmdFileExist, target), nil); err == nil {
return fmt.Errorf("[VirtualizationCopyTo] file existed: %s", target)
}
}

// cp /dev/stdin $PATH
_, stderr, err := s.runSingleCommand(ctx, fmt.Sprintf(cmdCopyFromStdin, path), content)
_, stderr, err := s.runSingleCommand(ctx, fmt.Sprintf(cmdCopyFromStdin, target), content)
return errors.Wrap(err, stderr.String())
}

Expand Down
4 changes: 2 additions & 2 deletions engine/virt/virt.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func (v *Virt) VirtualizationCreate(ctx context.Context, opts *enginetypes.Virtu
return &enginetypes.VirtualizationCreated{ID: resp.ID, Name: opts.Name}, nil
}

// VirtualizationCopyTo copies one.
func (v *Virt) VirtualizationCopyTo(ctx context.Context, ID, path string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) (err error) {
// VirtualizationCcontentopyTo copies one.
func (v *Virt) VirtualizationCopyTo(ctx context.Context, ID, target string, content io.Reader, AllowOverwriteDirWithFile, CopyUIDGID bool) (err error) {
log.Warnf("VirtualizationCopyTo does not implement")
return
}
Expand Down
39 changes: 0 additions & 39 deletions rpc/helper.go

This file was deleted.

31 changes: 0 additions & 31 deletions rpc/helper_test.go

This file was deleted.

Loading

0 comments on commit 9a7a381

Please sign in to comment.