Skip to content

Commit

Permalink
refactor: send and copy
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Nov 27, 2020
1 parent 5df7932 commit bb307d9
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 394 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ unit-test:
./source/common/... \
./strategy/... \
./scheduler/complex/... \
./rpc/. ./lock/etcdlock/... \
./rpc/. \
./lock/etcdlock/... \
./auth/simple/... \
./cluster/calcium/... \
./discovery/helium... \
Expand Down
31 changes: 10 additions & 21 deletions cluster/calcium/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"sync"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)
Expand All @@ -17,30 +16,20 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type
wg := sync.WaitGroup{}
log.Infof("[Copy] Copy %d workloads files", len(opts.Targets))
// workload one by one
for cid, paths := range opts.Targets {
for ID, paths := range opts.Targets {
wg.Add(1)
go func(cid string, paths []string) {
go func(ID string, paths []string) {
defer wg.Done()
workload, err := c.GetWorkload(ctx, cid)
if err != nil {
log.Errorf("[Copy] Error when get workload %s, err %v", cid, err)
ch <- makeCopyMessage(cid, cluster.CopyFailed, "", "", err, nil)
return
}
for _, path := range paths {
wg.Add(1)
go func(path string) {
defer wg.Done()
if err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error {
for _, path := range paths {
resp, name, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, path)
if err != nil {
log.Errorf("[Copy] Error during CopyFromWorkload: %v", err)
ch <- makeCopyMessage(cid, cluster.CopyFailed, "", path, err, nil)
return
}
ch <- makeCopyMessage(cid, cluster.CopyOK, name, path, nil, resp)
}(path)
ch <- makeCopyMessage(ID, name, path, err, resp)
}
return nil
}); err != nil {
ch <- makeCopyMessage(ID, "", "", err, nil)
}
}(cid, paths)
}(ID, paths)
}
wg.Wait()
}()
Expand Down
10 changes: 8 additions & 2 deletions cluster/calcium/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"

enginemocks "github.com/projecteru2/core/engine/mocks"
lockmocks "github.com/projecteru2/core/lock/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
"github.com/stretchr/testify/mock"
Expand All @@ -24,18 +25,23 @@ func TestCopy(t *testing.T) {
},
}
store := &storemocks.Store{}
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
c.store = store
// failed by GetWorkload
store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.Copy(ctx, opts)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
workload := &types.Workload{ID: "cid"}
workloads := []*types.Workload{workload}
engine := &enginemocks.API{}
workload.Engine = engine
store.On("GetWorkload", mock.Anything, mock.Anything).Return(workload, nil)
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(workloads, nil)
// failed by VirtualizationCopyFrom
engine.On("VirtualizationCopyFrom", mock.Anything, mock.Anything, mock.Anything).Return(nil, "", types.ErrNilEngine).Twice()
ch, err = c.Copy(ctx, opts)
Expand Down
26 changes: 12 additions & 14 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,13 @@ func pullImage(ctx context.Context, node *types.Node, image string) error {
return nil
}

func makeCopyMessage(id, status, name, path string, err error, data io.ReadCloser) *types.CopyMessage {
func makeCopyMessage(id, name, path string, err error, data io.ReadCloser) *types.CopyMessage {
return &types.CopyMessage{
ID: id,
Status: status,
Name: name,
Path: path,
Error: err,
Data: data,
ID: id,
Name: name,
Path: path,
Error: err,
Data: data,
}
}

Expand Down Expand Up @@ -201,14 +200,13 @@ func processBuildImageStream(reader io.ReadCloser) chan *types.BuildImageMessage
message := &types.BuildImageMessage{}
err := decoder.Decode(message)
if err != nil {
if err == io.EOF {
break
if err != io.EOF {
malformed := []byte{}
_, _ = decoder.Buffered().Read(malformed)
log.Errorf("[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed))
message.Error = err.Error()
ch <- message
}
malformed := []byte{}
_, _ = decoder.Buffered().Read(malformed)
log.Errorf("[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed))
message.Error = err.Error()
ch <- message
break
}
ch <- message
Expand Down
24 changes: 11 additions & 13 deletions cluster/calcium/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type
go func() {
defer close(ch)
wg := &sync.WaitGroup{}
for dst, content := range opts.Data {
log.Infof("[Send] Send files to %s", dst)

for _, ID := range opts.IDs {
log.Infof("[Send] Send files to %s", ID)
wg.Add(1)
go func(dst string, content []byte) {
go func(ID string) {
defer wg.Done()
for _, ID := range opts.IDs {
workload, err := c.GetWorkload(ctx, ID)
if err != nil {
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
continue
}
if err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true); err != nil {
if err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error {
for dst, content := range opts.Data {
err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true)
ch <- &types.SendMessage{ID: ID, Path: dst, Error: err}
continue
}
ch <- &types.SendMessage{ID: ID, Path: dst}
return nil
}); err != nil {
ch <- &types.SendMessage{ID: ID, Error: err}
}
}(dst, content)
}(ID)
}
wg.Wait()
}()
Expand Down
11 changes: 8 additions & 3 deletions cluster/calcium/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

enginemocks "github.com/projecteru2/core/engine/mocks"
lockmocks "github.com/projecteru2/core/lock/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
"github.com/stretchr/testify/assert"
Expand All @@ -28,16 +29,20 @@ func TestSend(t *testing.T) {
}
store := &storemocks.Store{}
c.store = store
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by GetWorkload
store.On("GetWorkload", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.Send(ctx, opts)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
engine := &enginemocks.API{}
store.On("GetWorkload", mock.Anything, mock.Anything).Return(
&types.Workload{Engine: engine}, nil,
store.On("GetWorkloads", mock.Anything, mock.Anything).Return(
[]*types.Workload{{ID: "cid", Engine: engine}}, nil,
)
// failed by engine
content, _ := ioutil.ReadAll(tmpfile)
Expand Down
4 changes: 0 additions & 4 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ const (
Gitlab = "gitlab"
// Github for github
Github = "github"
// CopyFailed for copy failed
CopyFailed = "failed"
// CopyOK for copy ok
CopyOK = "ok"
// CPUPeriodBase for cpu period base
CPUPeriodBase = 100000
// ERUMark mark workload controlled by eru
Expand Down
Loading

0 comments on commit bb307d9

Please sign in to comment.