Skip to content

Commit

Permalink
intro goroutine pool to restrain max concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Mar 11, 2021
1 parent 21bc787 commit 113786d
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 65 deletions.
82 changes: 36 additions & 46 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/pkg/errors"
"github.com/sanity-io/litter"
"golang.org/x/sync/semaphore"

"github.com/projecteru2/core/cluster"
enginetypes "github.com/projecteru2/core/engine/types"
Expand Down Expand Up @@ -104,7 +103,10 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
},

// rollback: give back resources
func(ctx context.Context, _ bool) (err error) {
func(ctx context.Context, failedOnCond bool) (err error) {
if failedOnCond {
return
}
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, plan := range plans {
Expand Down Expand Up @@ -159,61 +161,46 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
return utils.Range(deploy), errors.WithStack(err)
}

sem, appendLock := semaphore.NewWeighted(c.config.MaxConcurrency), sync.Mutex{}
pool, appendLock := utils.NewGoroutinePool(int(c.config.MaxConcurrency)), sync.Mutex{}
for idx := 0; idx < deploy; idx++ {
createMsg := &types.CreateWorkloadMessage{
Podname: opts.Podname,
Nodename: nodename,
Publish: map[string][]string{},
}

if e := sem.Acquire(ctx, 1); e != nil {
logger.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to acquire semaphore: %+v", e)
err = e
ch <- &types.CreateWorkloadMessage{Error: e}
appendLock.Lock()
indices = append(indices, idx)
appendLock.Unlock()
continue
}
go func(idx int) (e error) {
defer func() {
if e != nil {
err = e
createMsg.Error = logger.Err(e)
appendLock.Lock()
indices = append(indices, idx)
appendLock.Unlock()
pool.Go(func(idx int) func() {
return func() {
var e error
defer func() {
if e != nil {
err = e
createMsg.Error = logger.Err(e)
appendLock.Lock()
indices = append(indices, idx)
appendLock.Unlock()
}
ch <- createMsg
}()

r := &types.ResourceMeta{}
o := resourcetypes.DispenseOptions{
Node: node,
Index: idx,
}
ch <- createMsg
sem.Release(1)
}()

r := &types.ResourceMeta{}
o := resourcetypes.DispenseOptions{
Node: node,
Index: idx,
}
for _, plan := range plans {
if r, e = plan.Dispense(o, r); e != nil {
return errors.WithStack(e)
for _, plan := range plans {
if r, e = plan.Dispense(o, r); e != nil {
return
}
}
}

createMsg.ResourceMeta = *r
createOpts := c.doMakeWorkloadOptions(seq+idx, createMsg, opts, node)
return errors.WithStack(c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx))
}(idx) // nolint:errcheck
}

// sem.Acquire(ctx, MaxConcurrency) 等价于 WaitGroup.Wait()
// 用 context.Background() 是为了防止语义被破坏: 一定要等到所有 goroutine 完毕, 不能被用户 ctx 打断
// 否则可能会出现的情况是, 某些 goroutine 还没结束与运行 defer, 这个函数就 return 并 close channel, 导致 defer 里给 closed channel 发消息 panic
if e := sem.Acquire(context.Background(), c.config.MaxConcurrency); e != nil {
logger.Errorf("[Calcium.doDeployWorkloadsOnNode] Failed to wait all workers done: %+v", e)
err = e
indices = utils.Range(deploy)
createMsg.ResourceMeta = *r
createOpts := c.doMakeWorkloadOptions(seq+idx, createMsg, opts, node)
e = errors.WithStack(c.doDeployOneWorkload(ctx, node, opts, createMsg, createOpts, deploy-1-idx))
}
}(idx))
}
pool.Wait()

// remap 就不搞进事务了吧, 回滚代价太大了
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
Expand Down Expand Up @@ -367,6 +354,9 @@ func (c *Calcium) doDeployOneWorkload(

// remove workload
func(ctx context.Context, _ bool) error {
if workload.ID == "" {
return nil
}
return errors.WithStack(c.doRemoveWorkload(ctx, workload, true))
},
c.config.GlobalTimeout,
Expand Down
5 changes: 4 additions & 1 deletion cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ func (c *Calcium) doRemoveWorkload(ctx context.Context, workload *types.Workload
return errors.WithStack(workload.Remove(ctx, force))
},
// rollback
func(ctx context.Context, _ bool) error {
func(ctx context.Context, failedByCond bool) error {
if failedByCond {
return nil
}
return errors.WithStack(c.store.AddWorkload(ctx, workload))
},
c.config.GlobalTimeout,
Expand Down
39 changes: 21 additions & 18 deletions engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/go-connections/nat"
Expand All @@ -20,6 +19,7 @@ import (

corecluster "github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/utils"

dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -251,27 +251,30 @@ func (e *Engine) VirtualizationResourceRemap(ctx context.Context, opts *enginety
}

// update!
wg := sync.WaitGroup{}
ch := make(chan enginetypes.VirtualizationRemapMessage)
pool := utils.NewGoroutinePool(10)
for id, resource := range freeWorkloadResources {
// TODO@zc: limit the max goroutine
wg.Add(1)
go func(id string, resource enginetypes.VirtualizationResource) {
defer wg.Done()
updateConfig := dockercontainer.UpdateConfig{Resources: dockercontainer.Resources{
CPUQuota: int64(resource.Quota * float64(corecluster.CPUPeriodBase)),
CPUPeriod: corecluster.CPUPeriodBase,
CpusetCpus: shareCPUSet,
CPUShares: defaultCPUShare,
}}
_, err := e.client.ContainerUpdate(ctx, id, updateConfig)
ch <- enginetypes.VirtualizationRemapMessage{
ID: id,
Error: err,
pool.Go(func(id string, resource enginetypes.VirtualizationResource) func() {
return func() {
updateConfig := dockercontainer.UpdateConfig{Resources: dockercontainer.Resources{
CPUQuota: int64(resource.Quota * float64(corecluster.CPUPeriodBase)),
CPUPeriod: corecluster.CPUPeriodBase,
CpusetCpus: shareCPUSet,
CPUShares: defaultCPUShare,
}}
_, err := e.client.ContainerUpdate(ctx, id, updateConfig)
ch <- enginetypes.VirtualizationRemapMessage{
ID: id,
Error: err,
}
}
}(id, resource)
}(id, resource))
}
wg.Wait()

go func() {
defer close(ch)
pool.Wait()
}()
return ch, nil
}

Expand Down
1 change: 1 addition & 0 deletions engine/docker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func makeResourceSetting(cpu float64, memory int64, cpuMap map[string]int64, num
resource := dockercontainer.Resources{}

resource.CPUQuota = 0
resource.CPUShares = defaultCPUShare
resource.CPUPeriod = corecluster.CPUPeriodBase
if cpu > 0 {
resource.CPUQuota = int64(cpu * float64(corecluster.CPUPeriodBase))
Expand Down
31 changes: 31 additions & 0 deletions utils/gopool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package utils

import (
"context"

"golang.org/x/sync/semaphore"
)

type GoroutinePool struct {
max int64
sem *semaphore.Weighted
}

func NewGoroutinePool(max int) *GoroutinePool {
return &GoroutinePool{
max: int64(max),
sem: semaphore.NewWeighted(int64(max)),
}
}

func (p *GoroutinePool) Go(f func()) {
p.sem.Acquire(context.Background(), 1)
go func() {
defer p.sem.Release(1)
f()
}()
}

func (p *GoroutinePool) Wait() {
p.sem.Acquire(context.Background(), p.max)
}

0 comments on commit 113786d

Please sign in to comment.