diff --git a/cluster/calcium/replace.go b/cluster/calcium/replace.go index 93b450f1d..f36f32d58 100644 --- a/cluster/calcium/replace.go +++ b/cluster/calcium/replace.go @@ -115,11 +115,6 @@ func (c *Calcium) doReplaceContainer( if err = pullImage(ctx, node, opts.Image); err != nil { return nil, removeMessage, err } - // 停止容器 - removeMessage.Hook, err = c.doStopContainer(ctx, container, containerJSON, opts.IgnoreHook) - if err != nil { - return nil, removeMessage, err - } // 获得文件 io for src, dst := range opts.Copy { stream, _, err := container.Engine.VirtualizationCopyFrom(ctx, container.ID, src) @@ -132,6 +127,11 @@ func (c *Calcium) doReplaceContainer( } opts.DeployOptions.Data[dst] = fname } + // 停止容器 + removeMessage.Hook, err = c.doStopContainer(ctx, container, containerJSON, opts.IgnoreHook) + if err != nil { + return nil, removeMessage, err + } // 不涉及资源消耗,创建容器失败会被回收容器而不回收资源 // 创建成功容器会干掉之前的老容器也不会动资源,实际上实现了动态捆绑 createMessage := c.doCreateAndStartContainer(ctx, index, node, &opts.DeployOptions, container.CPU) diff --git a/rpc/helper.go b/rpc/helper.go new file mode 100644 index 000000000..14524f06b --- /dev/null +++ b/rpc/helper.go @@ -0,0 +1,39 @@ +package rpc + +import ( + "os" + + "github.com/projecteru2/core/utils" + log "github.com/sirupsen/logrus" +) + +func cleanDumpFiles(data map[string]string) error { + log.Debugf("[cleanDumpFiles] clean dump files %v", data) + var err error + for _, src := range data { + if err = os.RemoveAll(src); err != nil { + log.Errorf("[cleanDumpFiles] clean dump files failed %v", err) + } + } + return err +} + +func withDumpFiles(data map[string][]byte, f func(files map[string]string) error) error { + files := map[string]string{} + for path, data := range data { + fname, err := utils.TempTarFile(path, data) + if err != nil { + defer func() { + os.RemoveAll(fname) + for p := range files { + os.RemoveAll(p) + } + }() + return err + } + files[path] = fname + } + log.Debugf("[withDumpFiles] with temp files %v", files) + defer cleanDumpFiles(files) + return f(files) +} diff --git a/rpc/rpc.go b/rpc/rpc.go index bdf0a9219..1d1d3a23a 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -287,28 +287,30 @@ func (v *Vibranium) Send(opts *pb.SendOptions, stream pb.CoreRPC_SendServer) err if err != nil { return err } - defer cleanTmpDataFile(sendOpts.Data) - ch, err := v.cluster.Send(stream.Context(), sendOpts) - if err != nil { - return err - } - - for m := range ch { - msg := &pb.SendMessage{ - Id: m.ID, - Path: m.Path, + return withDumpFiles(opts.Data, func(files map[string]string) error { + sendOpts.Data = files + ch, err := v.cluster.Send(stream.Context(), sendOpts) + if err != nil { + return err } - if m.Error != nil { - msg.Error = m.Error.Error() - } + for m := range ch { + msg := &pb.SendMessage{ + Id: m.ID, + Path: m.Path, + } + + if m.Error != nil { + msg.Error = m.Error.Error() + } - if err := stream.Send(msg); err != nil { - v.logUnsentMessages("Send", m) + if err := stream.Send(msg); err != nil { + v.logUnsentMessages("Send", m) + } } - } - return nil + return nil + }) } // BuildImage streamed returned functions @@ -423,44 +425,47 @@ func (v *Vibranium) RunAndWait(stream pb.CoreRPC_RunAndWaitServer) error { if err != nil { return err } - defer cleanTmpDataFile(deployOpts.Data) - ch, err := v.cluster.RunAndWait(stream.Context(), deployOpts, stdinReader) - if err != nil { - // `ch` is nil now - log.Errorf("[RunAndWait] Start run and wait failed %s", err) - return err - } + return withDumpFiles(opts.Data, func(files map[string]string) error { + deployOpts.Data = files - if opts.OpenStdin { - go func() { - defer stdinWriter.Write([]byte("exit\n")) - stdinWriter.Write([]byte("echo 'Welcom to NERV...\n'\n")) - cli := []byte("echo \"`pwd`> \"\n") - stdinWriter.Write(cli) - for { - RunAndWaitOptions, err := stream.Recv() - if RunAndWaitOptions == nil || err != nil { - log.Errorf("[RunAndWait] Recv command error: %v", err) - break - } - log.Debugf("[RunAndWait] Recv command: %s", bytes.TrimRight(RunAndWaitOptions.Cmd, "\n")) - if _, err := stdinWriter.Write(RunAndWaitOptions.Cmd); err != nil { - log.Errorf("[RunAndWait] Write command error: %v", err) - break - } + ch, err := v.cluster.RunAndWait(stream.Context(), deployOpts, stdinReader) + if err != nil { + // `ch` is nil now + log.Errorf("[RunAndWait] Start run and wait failed %s", err) + return err + } + + if opts.OpenStdin { + go func() { + defer stdinWriter.Write([]byte("exit\n")) + stdinWriter.Write([]byte("echo 'Welcom to NERV...\n'\n")) + cli := []byte("echo \"`pwd`> \"\n") stdinWriter.Write(cli) - } - }() - } + for { + RunAndWaitOptions, err := stream.Recv() + if RunAndWaitOptions == nil || err != nil { + log.Errorf("[RunAndWait] Recv command error: %v", err) + break + } + log.Debugf("[RunAndWait] Recv command: %s", bytes.TrimRight(RunAndWaitOptions.Cmd, "\n")) + if _, err := stdinWriter.Write(RunAndWaitOptions.Cmd); err != nil { + log.Errorf("[RunAndWait] Write command error: %v", err) + break + } + stdinWriter.Write(cli) + } + }() + } - for m := range ch { - if err = stream.Send(toRPCRunAndWaitMessage(m)); err != nil { - v.logUnsentMessages("RunAndWait", m) + for m := range ch { + if err = stream.Send(toRPCRunAndWaitMessage(m)); err != nil { + v.logUnsentMessages("RunAndWait", m) + } } - } - return nil + return nil + }) } // CreateContainer create containers @@ -472,21 +477,23 @@ func (v *Vibranium) CreateContainer(opts *pb.DeployOptions, stream pb.CoreRPC_Cr if err != nil { return nil } - defer cleanTmpDataFile(deployOpts.Data) - // 这里考虑用全局 Background - ch, err := v.cluster.CreateContainer(context.Background(), deployOpts) - if err != nil { - return err - } + return withDumpFiles(opts.Data, func(files map[string]string) error { + deployOpts.Data = files + // 这里考虑用全局 Background + ch, err := v.cluster.CreateContainer(context.Background(), deployOpts) + if err != nil { + return err + } - for m := range ch { - if err = stream.Send(toRPCCreateContainerMessage(m)); err != nil { - v.logUnsentMessages("CreateContainer", m) + for m := range ch { + if err = stream.Send(toRPCCreateContainerMessage(m)); err != nil { + v.logUnsentMessages("CreateContainer", m) + } } - } - return nil + return nil + }) } // ReplaceContainer replace containers @@ -498,21 +505,23 @@ func (v *Vibranium) ReplaceContainer(opts *pb.ReplaceOptions, stream pb.CoreRPC_ if err != nil { return err } - defer cleanTmpDataFile(replaceOpts.DeployOptions.Data) - // 这里考虑用全局 Background - ch, err := v.cluster.ReplaceContainer(context.Background(), replaceOpts) - if err != nil { - return err - } + return withDumpFiles(opts.DeployOpt.Data, func(files map[string]string) error { + replaceOpts.Data = files + // 这里考虑用全局 Background + ch, err := v.cluster.ReplaceContainer(context.Background(), replaceOpts) + if err != nil { + return err + } - for m := range ch { - if err = stream.Send(toRPCReplaceContainerMessage(m)); err != nil { - v.logUnsentMessages("ReplaceContainer", m) + for m := range ch { + if err = stream.Send(toRPCReplaceContainerMessage(m)); err != nil { + v.logUnsentMessages("ReplaceContainer", m) + } } - } - return nil + return nil + }) } // RemoveContainer remove containers diff --git a/rpc/transform.go b/rpc/transform.go index bd35a2e99..b06e82e8d 100644 --- a/rpc/transform.go +++ b/rpc/transform.go @@ -3,7 +3,6 @@ package rpc import ( "bytes" "encoding/json" - "os" "strconv" "strings" @@ -108,14 +107,7 @@ func toCoreCopyOptions(b *pb.CopyOptions) *types.CopyOptions { } func toCoreSendOptions(b *pb.SendOptions) (*types.SendOptions, error) { - tarFiles, err := makeTempTarFiles(b.Data) - if err != nil { - return nil, err - } - return &types.SendOptions{ - IDs: b.Ids, - Data: tarFiles, - }, nil + return &types.SendOptions{IDs: b.Ids}, nil } func toCoreBuildOptions(b *pb.BuildImageOptions) (*enginetypes.BuildOptions, error) { @@ -210,11 +202,6 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) { entry.Hook.Force = entrypoint.Hook.Force } - tarFiles, err := makeTempTarFiles(d.Data) - if err != nil { - return nil, err - } - storage, err := toCoreDeployStorage(d.Volumes) if err != nil { return nil, err @@ -244,7 +231,6 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) { Labels: d.Labels, NodeLabels: d.Nodelabels, DeployMethod: d.DeployMethod, - Data: tarFiles, SoftLimit: d.SoftLimit, NodesLimit: int(d.NodesLimit), IgnoreHook: d.IgnoreHook, @@ -442,28 +428,3 @@ func toRPCLogStreamMessage(msg *types.LogStreamMessage) *pb.LogStreamMessage { } return r } - -func makeTempTarFiles(data map[string][]byte) (map[string]string, error) { - tarFiles := map[string]string{} - for path, data := range data { - fname, err := utils.TempTarFile(path, data) - if err != nil { - if fname != "" { - os.RemoveAll(fname) - } - return nil, err - } - tarFiles[path] = fname - } - return tarFiles, nil -} - -func cleanTmpDataFile(data map[string]string) error { - var err error - for _, src := range data { - if err = os.RemoveAll(src); err != nil { - log.Errorf("[cleanTmpDataFile] clean temp files failed %v", err) - } - } - return err -}