From da2f392e369e51567fd29264c71da929ec408534 Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 18 Feb 2021 18:36:23 +0800 Subject: [PATCH] ensure transaction go well --- cluster/calcium/create.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index f187f7c66..aaf8fe7eb 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -168,10 +168,11 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr if e := sem.Acquire(ctx, 1); e != nil { log.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to acquire semaphore: %+v", e) err = e + ch <- &types.CreateWorkloadMessage{Error: e} appendLock.Lock() - indices = append(indices, utils.Range(deploy)[idx:]...) + indices = append(indices, idx) appendLock.Unlock() - break + continue } go func(idx int) (e error) { defer func() { @@ -203,7 +204,10 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr }(idx) // nolint:errcheck } - if e := sem.Acquire(ctx, c.config.MaxConcurrency); e != nil { + // sem.Acquire(ctx, MaxConcurrency) 等价于 WaitGroup.Wait() + // 用 context.Background() 是为了防止语义被破坏: 一定要等到所有 goroutine 完毕, 不能被用户 ctx 打断 + // 否则可能会出现的情况是, 某些 goroutine 还没结束与运行 defer, 这个函数就 return 并 close channel, 导致 defer 里给 closed channel 发消息 panic + if e := sem.Acquire(context.Background(), c.config.MaxConcurrency); e != nil { log.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to wait all workers done: %+v", e) err = e indices = utils.Range(deploy)