From 113786d84d1fabe926e526171612b3eaa76c3235 Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 4 Mar 2021 16:34:20 +0800 Subject: [PATCH] intro goroutine pool to restrain max concurrency --- cluster/calcium/create.go | 82 +++++++++++++++++--------------------- cluster/calcium/remove.go | 5 ++- engine/docker/container.go | 39 +++++++++--------- engine/docker/helper.go | 1 + utils/gopool.go | 31 ++++++++++++++ 5 files changed, 93 insertions(+), 65 deletions(-) create mode 100644 utils/gopool.go diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index da0bcabf9..eb1768ab0 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -8,7 +8,6 @@ import ( "github.com/pkg/errors" "github.com/sanity-io/litter" - "golang.org/x/sync/semaphore" "github.com/projecteru2/core/cluster" enginetypes "github.com/projecteru2/core/engine/types" @@ -104,7 +103,10 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio }, // rollback: give back resources - func(ctx context.Context, _ bool) (err error) { + func(ctx context.Context, failedOnCond bool) (err error) { + if failedOnCond { + return + } for nodename, rollbackIndices := range rollbackMap { if e := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, plan := range plans { @@ -159,7 +161,7 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr return utils.Range(deploy), errors.WithStack(err) } - sem, appendLock := semaphore.NewWeighted(c.config.MaxConcurrency), sync.Mutex{} + pool, appendLock := utils.NewGoroutinePool(int(c.config.MaxConcurrency)), sync.Mutex{} for idx := 0; idx < deploy; idx++ { createMsg := &types.CreateWorkloadMessage{ Podname: opts.Podname, @@ -167,53 +169,38 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr Publish: map[string][]string{}, } - if e := sem.Acquire(ctx, 1); e != nil { - logger.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to acquire semaphore: %+v", e) - err = e - ch <- &types.CreateWorkloadMessage{Error: e} - appendLock.Lock() - indices = append(indices, idx) - appendLock.Unlock() - continue - } - go func(idx int) (e error) { - defer func() { - if e != nil { - err = e - createMsg.Error = logger.Err(e) - appendLock.Lock() - indices = append(indices, idx) - appendLock.Unlock() + pool.Go(func(idx int) func() { + return func() { + var e error + defer func() { + if e != nil { + err = e + createMsg.Error = logger.Err(e) + appendLock.Lock() + indices = append(indices, idx) + appendLock.Unlock() + } + ch <- createMsg + }() + + r := &types.ResourceMeta{} + o := resourcetypes.DispenseOptions{ + Node: node, + Index: idx, } - ch <- createMsg - sem.Release(1) - }() - - r := &types.ResourceMeta{} - o := resourcetypes.DispenseOptions{ - Node: node, - Index: idx, - } - for _, plan := range plans { - if r, e = plan.Dispense(o, r); e != nil { - return errors.WithStack(e) + for _, plan := range plans { + if r, e = plan.Dispense(o, r); e != nil { + return + } } - } - - createMsg.ResourceMeta = *r - createOpts := c.doMakeWorkloadOptions(seq+idx, createMsg, opts, node) - return errors.WithStack(c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx)) - }(idx) // nolint:errcheck - } - // sem.Acquire(ctx, MaxConcurrency) 等价于 WaitGroup.Wait() - // 用 context.Background() 是为了防止语义被破坏: 一定要等到所有 goroutine 完毕, 不能被用户 ctx 打断 - // 否则可能会出现的情况是, 某些 goroutine 还没结束与运行 defer, 这个函数就 return 并 close channel, 导致 defer 里给 closed channel 发消息 panic - if e := sem.Acquire(context.Background(), c.config.MaxConcurrency); e != nil { - logger.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to wait all workers done: %+v", e) - err = e - indices = utils.Range(deploy) + createMsg.ResourceMeta = *r + createOpts := c.doMakeWorkloadOptions(seq+idx, createMsg, opts, node) + e = errors.WithStack(c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx)) + } + }(idx)) } + pool.Wait() // remap 就不搞进事务了吧, 回滚代价太大了 // 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的 @@ -367,6 +354,9 @@ func (c *Calcium) doDeployOneWorkload( // remove workload func(ctx context.Context, _ bool) error { + if workload.ID == "" { + return nil + } return errors.WithStack(c.doRemoveWorkload(ctx, workload, true)) }, c.config.GlobalTimeout, diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index 1b84b6493..be82ed8ee 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -95,7 +95,10 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload return errors.WithStack(workload.Remove(ctx, force)) }, // rollback - func(ctx context.Context, _ bool) error { + func(ctx context.Context, failedByCond bool) error { + if failedByCond { + return nil + } return errors.WithStack(c.store.AddWorkload(ctx, workload)) }, c.config.GlobalTimeout, diff --git a/engine/docker/container.go b/engine/docker/container.go index a73eb2490..0e8ffc2aa 100644 --- a/engine/docker/container.go +++ b/engine/docker/container.go @@ -11,7 +11,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "time" "github.com/docker/go-connections/nat" @@ -20,6 +19,7 @@ import ( corecluster "github.com/projecteru2/core/cluster" "github.com/projecteru2/core/log" + "github.com/projecteru2/core/utils" dockertypes "github.com/docker/docker/api/types" dockercontainer "github.com/docker/docker/api/types/container" @@ -251,27 +251,30 @@ func (e *Engine) VirtualizationResourceRemap(ctx context.Context, opts *enginety } // update! - wg := sync.WaitGroup{} ch := make(chan enginetypes.VirtualizationRemapMessage) + pool := utils.NewGoroutinePool(10) for id, resource := range freeWorkloadResources { - // TODO@zc: limit the max goroutine - wg.Add(1) - go func(id string, resource enginetypes.VirtualizationResource) { - defer wg.Done() - updateConfig := dockercontainer.UpdateConfig{Resources: dockercontainer.Resources{ - CPUQuota: int64(resource.Quota * float64(corecluster.CPUPeriodBase)), - CPUPeriod: corecluster.CPUPeriodBase, - CpusetCpus: shareCPUSet, - CPUShares: defaultCPUShare, - }} - _, err := e.client.ContainerUpdate(ctx, id, updateConfig) - ch <- enginetypes.VirtualizationRemapMessage{ - ID: id, - Error: err, + pool.Go(func(id string, resource enginetypes.VirtualizationResource) func() { + return func() { + updateConfig := dockercontainer.UpdateConfig{Resources: dockercontainer.Resources{ + CPUQuota: int64(resource.Quota * float64(corecluster.CPUPeriodBase)), + CPUPeriod: corecluster.CPUPeriodBase, + CpusetCpus: shareCPUSet, + CPUShares: defaultCPUShare, + }} + _, err := e.client.ContainerUpdate(ctx, id, updateConfig) + ch <- enginetypes.VirtualizationRemapMessage{ + ID: id, + Error: err, + } } - }(id, resource) + }(id, resource)) } - wg.Wait() + + go func() { + defer close(ch) + pool.Wait() + }() return ch, nil } diff --git a/engine/docker/helper.go b/engine/docker/helper.go index 3a19622dd..80ba868ad 100644 --- a/engine/docker/helper.go +++ b/engine/docker/helper.go @@ -104,6 +104,7 @@ func makeResourceSetting(cpu float64, memory int64, cpuMap map[string]int64, num resource := dockercontainer.Resources{} resource.CPUQuota = 0 + resource.CPUShares = defaultCPUShare resource.CPUPeriod = corecluster.CPUPeriodBase if cpu > 0 { resource.CPUQuota = int64(cpu * float64(corecluster.CPUPeriodBase)) diff --git a/utils/gopool.go b/utils/gopool.go new file mode 100644 index 000000000..9c7689c05 --- /dev/null +++ b/utils/gopool.go @@ -0,0 +1,31 @@ +package utils + +import ( + "context" + + "golang.org/x/sync/semaphore" +) + +type GoroutinePool struct { + max int64 + sem *semaphore.Weighted +} + +func NewGoroutinePool(max int) *GoroutinePool { + return &GoroutinePool{ + max: int64(max), + sem: semaphore.NewWeighted(int64(max)), + } +} + +func (p *GoroutinePool) Go(f func()) { + p.sem.Acquire(context.Background(), 1) + go func() { + defer p.sem.Release(1) + f() + }() +} + +func (p *GoroutinePool) Wait() { + p.sem.Acquire(context.Background(), p.max) +}