From a98287656683775be0f899335a7f200b0f0497a9 Mon Sep 17 00:00:00 2001 From: hongji Date: Wed, 19 Jul 2023 16:36:27 +0800 Subject: [PATCH] bug fix: need to add waitgroup in the very beginning --- cluster/calcium/sendlarge.go | 6 +++--- rpc/rpc.go | 11 ++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cluster/calcium/sendlarge.go b/cluster/calcium/sendlarge.go index b787fa336..2162fea62 100644 --- a/cluster/calcium/sendlarge.go +++ b/cluster/calcium/sendlarge.go @@ -12,18 +12,19 @@ import ( ) // SendLargeFile send large files by stream to workload -func (c *Calcium) SendLargeFile(ctx context.Context, opts chan *types.SendLargeFileOptions) chan *types.SendMessage { +func (c *Calcium) SendLargeFile(ctx context.Context, inputChan chan *types.SendLargeFileOptions) chan *types.SendMessage { resp := make(chan *types.SendMessage) wg := &sync.WaitGroup{} utils.SentryGo(func() { defer close(resp) senders := make(map[string]*workloadSender) // for each file - for data := range opts { + for data := range inputChan { for _, id := range data.Ids { if _, ok := senders[id]; !ok { log.Debugf(ctx, "[SendLargeFile] create sender for %s", id) // for each container, let's create a new sender to send identical file chunk, each chunk will include the metadata of this file + wg.Add(1) sender := c.newWorkloadSender(ctx, id, resp, wg) senders[id] = sender } @@ -65,7 +66,6 @@ func (c *Calcium) newWorkloadSender(ctx context.Context, ID string, resp chan *t } // ready to send if curFile == "" { - wg.Add(1) log.Debugf(ctx, "[newWorkloadExecutor]Receive new file %s to %s", curFile, sender.id) curFile = data.Dst pr, pw := io.Pipe() diff --git a/rpc/rpc.go b/rpc/rpc.go index 5a8e8bf36..84e2bbe87 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -607,8 +607,8 @@ func (v *Vibranium) SendLargeFile(server pb.CoreRPC_SendLargeFileServer) error { task := v.newTask(server.Context(), "SendLargeFile", true) defer task.done() - dc := make(chan *types.SendLargeFileOptions) - ch := v.cluster.SendLargeFile(task.context, dc) + inputChan := make(chan *types.SendLargeFileOptions) + resp := v.cluster.SendLargeFile(task.context, inputChan) utils.SentryGo(func() { for { req, err := server.Recv() @@ -624,12 +624,13 @@ func (v *Vibranium) SendLargeFile(server pb.CoreRPC_SendLargeFileServer) error { log.Errorf(task.context, err, "[SendLargeFile]transform data err: %v", err) return } - dc <- data + inputChan <- data } - close(dc) + + close(inputChan) }) - for m := range ch { + for m := range resp { msg := &pb.SendMessage{ Id: m.ID, Path: m.Path,