Skip to content

Commit

Permalink
wrapper rpc dump files
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jul 4, 2019
1 parent de4edd9 commit 351833b
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 116 deletions.
10 changes: 5 additions & 5 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ func (c *Calcium) doReplaceContainer(
if err = pullImage(ctx, node, opts.Image); err != nil {
return nil, removeMessage, err
}
// 停止容器
removeMessage.Hook, err = c.doStopContainer(ctx, container, containerJSON, opts.IgnoreHook)
if err != nil {
return nil, removeMessage, err
}
// 获得文件 io
for src, dst := range opts.Copy {
stream, _, err := container.Engine.VirtualizationCopyFrom(ctx, container.ID, src)
Expand All @@ -132,6 +127,11 @@ func (c *Calcium) doReplaceContainer(
}
opts.DeployOptions.Data[dst] = fname
}
// 停止容器
removeMessage.Hook, err = c.doStopContainer(ctx, container, containerJSON, opts.IgnoreHook)
if err != nil {
return nil, removeMessage, err
}
// 不涉及资源消耗,创建容器失败会被回收容器而不回收资源
// 创建成功容器会干掉之前的老容器也不会动资源,实际上实现了动态捆绑
createMessage := c.doCreateAndStartContainer(ctx, index, node, &opts.DeployOptions, container.CPU)
Expand Down
39 changes: 39 additions & 0 deletions rpc/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rpc

import (
"os"

"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

func cleanDumpFiles(data map[string]string) error {
log.Debugf("[cleanDumpFiles] clean dump files %v", data)
var err error
for _, src := range data {
if err = os.RemoveAll(src); err != nil {
log.Errorf("[cleanDumpFiles] clean dump files failed %v", err)
}
}
return err
}

func withDumpFiles(data map[string][]byte, f func(files map[string]string) error) error {
files := map[string]string{}
for path, data := range data {
fname, err := utils.TempTarFile(path, data)
if err != nil {
defer func() {
os.RemoveAll(fname)
for p := range files {
os.RemoveAll(p)
}
}()
return err
}
files[path] = fname
}
log.Debugf("[withDumpFiles] with temp files %v", files)
defer cleanDumpFiles(files)
return f(files)
}
151 changes: 80 additions & 71 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,28 +287,30 @@ func (v *Vibranium) Send(opts *pb.SendOptions, stream pb.CoreRPC_SendServer) err
if err != nil {
return err
}
defer cleanTmpDataFile(sendOpts.Data)

ch, err := v.cluster.Send(stream.Context(), sendOpts)
if err != nil {
return err
}

for m := range ch {
msg := &pb.SendMessage{
Id: m.ID,
Path: m.Path,
return withDumpFiles(opts.Data, func(files map[string]string) error {
sendOpts.Data = files
ch, err := v.cluster.Send(stream.Context(), sendOpts)
if err != nil {
return err
}

if m.Error != nil {
msg.Error = m.Error.Error()
}
for m := range ch {
msg := &pb.SendMessage{
Id: m.ID,
Path: m.Path,
}

if m.Error != nil {
msg.Error = m.Error.Error()
}

if err := stream.Send(msg); err != nil {
v.logUnsentMessages("Send", m)
if err := stream.Send(msg); err != nil {
v.logUnsentMessages("Send", m)
}
}
}
return nil
return nil
})
}

// BuildImage streamed returned functions
Expand Down Expand Up @@ -423,44 +425,47 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error {
if err != nil {
return err
}
defer cleanTmpDataFile(deployOpts.Data)

ch, err := v.cluster.RunAndWait(stream.Context(), deployOpts, stdinReader)
if err != nil {
// `ch` is nil now
log.Errorf("[RunAndWait] Start run and wait failed %s", err)
return err
}
return withDumpFiles(opts.Data, func(files map[string]string) error {
deployOpts.Data = files

if opts.OpenStdin {
go func() {
defer stdinWriter.Write([]byte("exit\n"))
stdinWriter.Write([]byte("echo 'Welcom to NERV...\n'\n"))
cli := []byte("echo \"`pwd`> \"\n")
stdinWriter.Write(cli)
for {
RunAndWaitOptions, err := stream.Recv()
if RunAndWaitOptions == nil || err != nil {
log.Errorf("[RunAndWait] Recv command error: %v", err)
break
}
log.Debugf("[RunAndWait] Recv command: %s", bytes.TrimRight(RunAndWaitOptions.Cmd, "\n"))
if _, err := stdinWriter.Write(RunAndWaitOptions.Cmd); err != nil {
log.Errorf("[RunAndWait] Write command error: %v", err)
break
}
ch, err := v.cluster.RunAndWait(stream.Context(), deployOpts, stdinReader)
if err != nil {
// `ch` is nil now
log.Errorf("[RunAndWait] Start run and wait failed %s", err)
return err
}

if opts.OpenStdin {
go func() {
defer stdinWriter.Write([]byte("exit\n"))
stdinWriter.Write([]byte("echo 'Welcom to NERV...\n'\n"))
cli := []byte("echo \"`pwd`> \"\n")
stdinWriter.Write(cli)
}
}()
}
for {
RunAndWaitOptions, err := stream.Recv()
if RunAndWaitOptions == nil || err != nil {
log.Errorf("[RunAndWait] Recv command error: %v", err)
break
}
log.Debugf("[RunAndWait] Recv command: %s", bytes.TrimRight(RunAndWaitOptions.Cmd, "\n"))
if _, err := stdinWriter.Write(RunAndWaitOptions.Cmd); err != nil {
log.Errorf("[RunAndWait] Write command error: %v", err)
break
}
stdinWriter.Write(cli)
}
}()
}

for m := range ch {
if err = stream.Send(toRPCRunAndWaitMessage(m)); err != nil {
v.logUnsentMessages("RunAndWait", m)
for m := range ch {
if err = stream.Send(toRPCRunAndWaitMessage(m)); err != nil {
v.logUnsentMessages("RunAndWait", m)
}
}
}

return nil
return nil
})
}

// CreateContainer create containers
Expand All @@ -472,21 +477,23 @@ func (v *Vibranium) CreateContainer(opts *pb.DeployOptions, stream pb.CoreRPC_Cr
if err != nil {
return nil
}
defer cleanTmpDataFile(deployOpts.Data)

// 这里考虑用全局 Background
ch, err := v.cluster.CreateContainer(context.Background(), deployOpts)
if err != nil {
return err
}
return withDumpFiles(opts.Data, func(files map[string]string) error {
deployOpts.Data = files
// 这里考虑用全局 Background
ch, err := v.cluster.CreateContainer(context.Background(), deployOpts)
if err != nil {
return err
}

for m := range ch {
if err = stream.Send(toRPCCreateContainerMessage(m)); err != nil {
v.logUnsentMessages("CreateContainer", m)
for m := range ch {
if err = stream.Send(toRPCCreateContainerMessage(m)); err != nil {
v.logUnsentMessages("CreateContainer", m)
}
}
}

return nil
return nil
})
}

// ReplaceContainer replace containers
Expand All @@ -498,21 +505,23 @@ func (v *Vibranium) ReplaceContainer(opts *pb.ReplaceOptions, stream pb.CoreRPC_
if err != nil {
return err
}
defer cleanTmpDataFile(replaceOpts.DeployOptions.Data)

// 这里考虑用全局 Background
ch, err := v.cluster.ReplaceContainer(context.Background(), replaceOpts)
if err != nil {
return err
}
return withDumpFiles(opts.DeployOpt.Data, func(files map[string]string) error {
replaceOpts.Data = files
// 这里考虑用全局 Background
ch, err := v.cluster.ReplaceContainer(context.Background(), replaceOpts)
if err != nil {
return err
}

for m := range ch {
if err = stream.Send(toRPCReplaceContainerMessage(m)); err != nil {
v.logUnsentMessages("ReplaceContainer", m)
for m := range ch {
if err = stream.Send(toRPCReplaceContainerMessage(m)); err != nil {
v.logUnsentMessages("ReplaceContainer", m)
}
}
}

return nil
return nil
})
}

// RemoveContainer remove containers
Expand Down
41 changes: 1 addition & 40 deletions rpc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rpc
import (
"bytes"
"encoding/json"
"os"
"strconv"
"strings"

Expand Down Expand Up @@ -108,14 +107,7 @@ func toCoreCopyOptions(b *pb.CopyOptions) *types.CopyOptions {
}

func toCoreSendOptions(b *pb.SendOptions) (*types.SendOptions, error) {
tarFiles, err := makeTempTarFiles(b.Data)
if err != nil {
return nil, err
}
return &types.SendOptions{
IDs: b.Ids,
Data: tarFiles,
}, nil
return &types.SendOptions{IDs: b.Ids}, nil
}

func toCoreBuildOptions(b *pb.BuildImageOptions) (*enginetypes.BuildOptions, error) {
Expand Down Expand Up @@ -210,11 +202,6 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) {
entry.Hook.Force = entrypoint.Hook.Force
}

tarFiles, err := makeTempTarFiles(d.Data)
if err != nil {
return nil, err
}

storage, err := toCoreDeployStorage(d.Volumes)
if err != nil {
return nil, err
Expand Down Expand Up @@ -244,7 +231,6 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) {
Labels: d.Labels,
NodeLabels: d.Nodelabels,
DeployMethod: d.DeployMethod,
Data: tarFiles,
SoftLimit: d.SoftLimit,
NodesLimit: int(d.NodesLimit),
IgnoreHook: d.IgnoreHook,
Expand Down Expand Up @@ -442,28 +428,3 @@ func toRPCLogStreamMessage(msg *types.LogStreamMessage) *pb.LogStreamMessage {
}
return r
}

func makeTempTarFiles(data map[string][]byte) (map[string]string, error) {
tarFiles := map[string]string{}
for path, data := range data {
fname, err := utils.TempTarFile(path, data)
if err != nil {
if fname != "" {
os.RemoveAll(fname)
}
return nil, err
}
tarFiles[path] = fname
}
return tarFiles, nil
}

func cleanTmpDataFile(data map[string]string) error {
var err error
for _, src := range data {
if err = os.RemoveAll(src); err != nil {
log.Errorf("[cleanTmpDataFile] clean temp files failed %v", err)
}
}
return err
}

0 comments on commit 351833b

Please sign in to comment.