Skip to content

Commit

Permalink
ensure transaction go well
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 committed Feb 19, 2021
1 parent 6d6918b commit 51b54ed
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
10 changes: 7 additions & 3 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
if e := sem.Acquire(ctx, 1); e != nil {
log.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to acquire semaphore: %+v", e)
err = e
ch <- &types.CreateWorkloadMessage{Error: e}
appendLock.Lock()
indices = append(indices, utils.Range(deploy)[idx:]...)
indices = append(indices, idx)
appendLock.Unlock()
break
continue
}
go func(idx int) (e error) {
defer func() {
Expand Down Expand Up @@ -203,7 +204,10 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
}(idx) // nolint:errcheck
}

if e := sem.Acquire(ctx, c.config.MaxConcurrency); e != nil {
// sem.Acquire(ctx, MaxConcurrency) 等价于 WaitGroup.Wait()
// 用 context.Background() 是为了防止语义被破坏: 一定要等到所有 goroutine 完毕, 不能被用户 ctx 打断
// 否则可能会出现的情况是, 某些 goroutine 还没结束与运行 defer, 这个函数就 return 并 close channel, 导致 defer 里给 closed channel 发消息 panic
if e := sem.Acquire(context.Background(), c.config.MaxConcurrency); e != nil {
log.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to wait all workers done: %+v", e)
err = e
indices = utils.Range(deploy)
Expand Down
1 change: 0 additions & 1 deletion cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ func processVirtualizationOutStream(
if split != 0 {
bs = append(bs, split)
}
println(string(bs))
outCh <- bs
}
if err := scanner.Err(); err != nil {
Expand Down

0 comments on commit 51b54ed

Please sign in to comment.