Skip to content

Commit

Permalink
reduce unnecessary context.Background()
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Apr 13, 2021
1 parent 6355281 commit 22411b7
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 29 deletions.
12 changes: 7 additions & 5 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio

go func() {
defer func() {
cctx, cancel := context.WithTimeout(context.Background(), c.config.GlobalTimeout)
for nodename := range deployMap {
if e := c.store.DeleteProcessing(context.Background(), opts, nodename); e != nil {
if e := c.store.DeleteProcessing(cctx, opts, nodename); e != nil {
logger.Errorf("[Calcium.doCreateWorkloads] delete processing failed for %s: %+v", nodename, e)
}
}
close(ch)
cancel()
}()

_ = utils.Txn(
Expand Down Expand Up @@ -167,7 +169,7 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
Publish: map[string][]string{},
}

pool.Go(func(idx int) func() {
pool.Go(ctx, func(idx int) func() {
return func() {
var e error
defer func() {
Expand Down Expand Up @@ -198,13 +200,13 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
}
}(idx))
}
pool.Wait()
pool.Wait(ctx)

// remap 就不搞进事务了吧, 回滚代价太大了
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
// 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态
if err := c.withNodeLocked(context.Background(), nodename, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(context.Background(), logger, node)
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(logger, node)
return nil
}); err != nil {
logger.Errorf("failed to lock node to remap: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
}
ch <- msg
}
c.doRemapResourceAndLog(context.Background(), logger, node)
c.doRemapResourceAndLog(logger, node)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC

lambda := func(message *types.CreateWorkloadMessage) {
defer func() {
if err := c.doRemoveWorkloadSync(context.Background(), []string{message.WorkloadID}); err != nil {
if err := c.doRemoveWorkloadSync(context.TODO(), []string{message.WorkloadID}); err != nil {
logger.Errorf("[RunAndWait] Remove lambda workload failed %+v", err)
} else {
log.Infof("[RunAndWait] Workload %s finished and removed", utils.ShortID(message.WorkloadID))
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload
return
}

c.doRemapResourceAndLog(context.Background(), log.WithField("Calcium", "doReallocOnNode"), node)
c.doRemapResourceAndLog(log.WithField("Calcium", "doReallocOnNode"), node)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
}
ch <- ret
}
c.doRemapResourceAndLog(context.Background(), logger, node)
c.doRemapResourceAndLog(logger, node)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf("failed to lock node: %+v", err)
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (c *Calcium) doReplaceWorkload(
return createMessage, removeMessage, err
}

if err := c.withNodeLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(context.Background(), log.WithField("Calcium", "doReplaceWorkload"), node)
if err := c.withNodeLocked(ctx, node.Name, func(_ context.Context, node *types.Node) error {
c.doRemapResourceAndLog(log.WithField("Calcium", "doReplaceWorkload"), node)
return nil
}); err != nil {
log.Errorf("[replaceAndRemove] failed to lock node to remap: %v", err)
Expand Down
4 changes: 3 additions & 1 deletion cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,10 @@ func (c *Calcium) remapResource(ctx context.Context, node *types.Node) (ch <-cha
return ch, errors.WithStack(err)
}

func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, node *types.Node) {
func (c *Calcium) doRemapResourceAndLog(logger log.Fields, node *types.Node) {
log.Debugf("[doRemapResourceAndLog] remap node %s", node.Name)
ctx, cancel := context.WithTimeout(context.Background(), c.config.GlobalTimeout)
defer cancel()
logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name)
if ch, err := c.remapResource(ctx, node); logger.Err(err) == nil {
for msg := range ch {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,5 +303,5 @@ func TestRemapResource(t *testing.T) {
_, err := c.remapResource(context.Background(), node)
assert.Nil(t, err)

c.doRemapResourceAndLog(context.Background(), log.WithField("test", "zc"), node)
c.doRemapResourceAndLog(log.WithField("test", "zc"), node)
}
2 changes: 1 addition & 1 deletion discovery/helium/helium.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func New(config types.GRPCConfig, stor store.Store) *Helium {
h.config = config
h.stor = stor
h.Do(func() {
h.start(context.Background()) // rewrite ctx here, because this will run only once!
h.start(context.TODO()) // rewrite ctx here, because this will run only once!
})
return h
}
Expand Down
4 changes: 2 additions & 2 deletions engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (e *Engine) VirtualizationResourceRemap(ctx context.Context, opts *enginety
go func() {
defer close(ch)
for id, resource := range freeWorkloadResources {
pool.Go(func(id string, resource enginetypes.VirtualizationResource) func() {
pool.Go(ctx, func(id string, resource enginetypes.VirtualizationResource) func() {
return func() {
updateConfig := dockercontainer.UpdateConfig{Resources: dockercontainer.Resources{
CPUQuota: int64(resource.Quota * float64(corecluster.CPUPeriodBase)),
Expand All @@ -277,7 +277,7 @@ func (e *Engine) VirtualizationResourceRemap(ctx context.Context, opts *enginety
}
}(id, resource))
}
pool.Wait()
pool.Wait(ctx)
}()

return ch, nil
Expand Down
6 changes: 4 additions & 2 deletions lock/redis/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func New(cli *redis.Client, key string, waitTimeout, lockTTL time.Duration) (*Re
// Lock acquires the lock
// will try waitTimeout time before getting the lock
func (r *RedisLock) Lock(ctx context.Context) (context.Context, error) {
return r.lock(ctx, opts)
lockCtx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
return r.lock(lockCtx, opts)
}

// TryLock tries to lock
Expand All @@ -66,7 +68,7 @@ func (r *RedisLock) lock(ctx context.Context, opts *redislock.Options) (context.
}

r.l = l
return context.Background(), nil
return context.TODO(), nil // no need wrapped, not like etcd
}

// Unlock releases the lock
Expand Down
9 changes: 7 additions & 2 deletions store/etcdv3/meta/ephemeral.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,24 @@ func (e *ETCD) StartEphemeral(ctx context.Context, path string, heartbeat time.D
defer tick.Stop()

revoke := func() {
if _, err := e.cliv3.Revoke(context.Background(), lease.ID); err != nil {
cctx, ccancel := context.WithTimeout(context.Background(), time.Minute) // todo minute sucks
defer ccancel()
if _, err := e.cliv3.Revoke(cctx, lease.ID); err != nil {
log.Errorf("[StartEphemeral] revoke %d with %s failed: %v", lease.ID, path, err)
}
}

for {
select {
case <-tick.C:
if _, err := e.cliv3.KeepAliveOnce(context.Background(), lease.ID); err != nil {
cctx, ccancel := context.WithTimeout(ctx, time.Minute) // todo minute sucks
if _, err := e.cliv3.KeepAliveOnce(cctx, lease.ID); err != nil {
log.Errorf("[StartEphemeral] keepalive %d with %s failed: %v", lease.ID, path, err)
ccancel()
revoke()
return
}
ccancel()

case <-ctx.Done():
revoke()
Expand Down
4 changes: 3 additions & 1 deletion store/redis/rediaron.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func (r *Rediaron) KNotify(ctx context.Context, pattern string) chan *KNotifyMes
for {
select {
case <-ctx.Done():
_ = pubsub.PUnsubscribe(context.Background(), channel)
ctx, cancel := context.WithTimeout(context.Background(), r.config.GlobalTimeout)
defer cancel()
_ = pubsub.PUnsubscribe(ctx, channel)
return
case v := <-subC:
if v == nil {
Expand Down
16 changes: 10 additions & 6 deletions utils/gopool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"context"

"github.com/projecteru2/core/log"
"golang.org/x/sync/semaphore"
)

Expand All @@ -22,17 +23,20 @@ func NewGoroutinePool(max int) *GoroutinePool {
}

// Go spawns new goroutine, but may block due to max number limit
func (p *GoroutinePool) Go(f func()) {
// there won't be error once we use background ctx
p.sem.Acquire(context.Background(), 1) // nolint:errcheck
func (p *GoroutinePool) Go(ctx context.Context, f func()) {
if err := p.sem.Acquire(ctx, 1); err != nil {
log.Errorf("[GoroutinePool] Go acquire failed %v", err)
return
}
go func() {
defer p.sem.Release(1)
f()
}()
}

// Wait is equivalent to sync.WaitGroup.Wait()
func (p *GoroutinePool) Wait() {
// there won't be error once we use background ctx
p.sem.Acquire(context.Background(), p.max) // nolint:errcheck
func (p *GoroutinePool) Wait(ctx context.Context) {
if err := p.sem.Acquire(ctx, p.max); err != nil {
log.Errorf("[GoroutinePool] Wait acquire failed %v", err)
}
}
5 changes: 3 additions & 2 deletions utils/gopool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"context"
"testing"
"time"

Expand All @@ -11,11 +12,11 @@ func TestGoroutinePool(t *testing.T) {
pool := NewGoroutinePool(1)
cnt := 0
for i := 0; i < 3; i++ {
pool.Go(func() {
pool.Go(context.TODO(), func() {
time.Sleep(time.Duration(i) * 100 * time.Microsecond)
cnt++
})
}
pool.Wait()
pool.Wait(context.TODO())
assert.Equal(t, 3, cnt)
}

0 comments on commit 22411b7

Please sign in to comment.