Skip to content

Commit

Permalink
bug fix: need to add waitgroup in the very beginning
Browse files Browse the repository at this point in the history
  • Loading branch information
hongjijun233 committed Jul 19, 2023
1 parent 9e32ac4 commit a982876
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
6 changes: 3 additions & 3 deletions cluster/calcium/sendlarge.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ import (
)

// SendLargeFile send large files by stream to workload
func (c *Calcium) SendLargeFile(ctx context.Context, opts chan *types.SendLargeFileOptions) chan *types.SendMessage {
func (c *Calcium) SendLargeFile(ctx context.Context, inputChan chan *types.SendLargeFileOptions) chan *types.SendMessage {
resp := make(chan *types.SendMessage)
wg := &sync.WaitGroup{}
utils.SentryGo(func() {
defer close(resp)
senders := make(map[string]*workloadSender)
// for each file
for data := range opts {
for data := range inputChan {
for _, id := range data.Ids {
if _, ok := senders[id]; !ok {
log.Debugf(ctx, "[SendLargeFile] create sender for %s", id)
// for each container, let's create a new sender to send identical file chunk, each chunk will include the metadata of this file
wg.Add(1)
sender := c.newWorkloadSender(ctx, id, resp, wg)
senders[id] = sender
}
Expand Down Expand Up @@ -65,7 +66,6 @@ func (c *Calcium) newWorkloadSender(ctx context.Context, ID string, resp chan *t
}
// ready to send
if curFile == "" {
wg.Add(1)
log.Debugf(ctx, "[newWorkloadExecutor]Receive new file %s to %s", curFile, sender.id)
curFile = data.Dst
pr, pw := io.Pipe()
Expand Down
11 changes: 6 additions & 5 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,8 @@ func (v *Vibranium) SendLargeFile(server pb.CoreRPC_SendLargeFileServer) error {
task := v.newTask(server.Context(), "SendLargeFile", true)
defer task.done()

dc := make(chan *types.SendLargeFileOptions)
ch := v.cluster.SendLargeFile(task.context, dc)
inputChan := make(chan *types.SendLargeFileOptions)
resp := v.cluster.SendLargeFile(task.context, inputChan)
utils.SentryGo(func() {
for {
req, err := server.Recv()
Expand All @@ -624,12 +624,13 @@ func (v *Vibranium) SendLargeFile(server pb.CoreRPC_SendLargeFileServer) error {
log.Errorf(task.context, err, "[SendLargeFile]transform data err: %v", err)
return
}
dc <- data
inputChan <- data
}
close(dc)

close(inputChan)
})

for m := range ch {
for m := range resp {
msg := &pb.SendMessage{
Id: m.ID,
Path: m.Path,
Expand Down

0 comments on commit a982876

Please sign in to comment.