Skip to content

Commit

Permalink
use context with cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Jan 25, 2022
1 parent 0deb6e7 commit 38b68cb
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 146 deletions.
4 changes: 3 additions & 1 deletion cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
// the workload should be removed if it exists
// no matter the workload exits successfully or not
defer func() {
if err := c.doRemoveWorkloadSync(context.TODO(), []string{message.WorkloadID}); err != nil {
ctx, cancel := context.WithCancel(utils.InheritTracingInfo(ctx, context.TODO()))
defer cancel()
if err := c.doRemoveWorkloadSync(ctx, []string{message.WorkloadID}); err != nil {
logger.Errorf(ctx, "[RunAndWait] Remove lambda workload failed %+v", err)
} else {
log.Infof(ctx, "[RunAndWait] Workload %s finished and removed", utils.ShortID(message.WorkloadID))
Expand Down
2 changes: 1 addition & 1 deletion engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func GetEngineFromCache(endpoint, ca, cert, key string) engine.API {
// RemoveEngineFromCache .
func RemoveEngineFromCache(endpoint, ca, cert, key string) {
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
log.Infof(context.TODO(), "[RemoveEngineFromCache] remove engine %v from cache", cacheKey)
log.Infof(nil, "[RemoveEngineFromCache] remove engine %v from cache", cacheKey)
engineCache.Delete(cacheKey)
}

Expand Down
29 changes: 22 additions & 7 deletions rpc/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,48 @@ import (
"golang.org/x/net/context"
)

type task struct {
v *Vibranium
name string
verbose bool
context context.Context
cancel context.CancelFunc
}

// gRPC上全局的计数器
// 只有在任务数为0的时候才给停止
// 为啥会加在gRPC server上呢?
// 因为一个入口给一个最简单了...

// 增加一个任务, 在任务调用之前要调用一次.
// 否则任务不被追踪, 不保证任务能够正常完成.
func (v *Vibranium) taskAdd(ctx context.Context, name string, verbose bool) context.Context {
func (v *Vibranium) newTask(ctx context.Context, name string, verbose bool) *task {
if ctx != nil {
ctx = context.WithValue(ctx, types.TracingID, utils.RandomString(8))
}
ctx, cancel := context.WithCancel(ctx)
if verbose {
log.Debugf(ctx, "[task] %s added", name)
}
v.counter.Add(1)
v.TaskNum++
return ctx
return &task{
v: v,
name: name,
verbose: verbose,
context: ctx,
cancel: cancel,
}
}

// 完成一个任务, 在任务执行完之后调用一次.
// 否则计数器用完不会为0, 你也别想退出这个进程了.
func (v *Vibranium) taskDone(ctx context.Context, name string, verbose bool) {
if verbose {
log.Debugf(ctx, "[task] %s done", name)
func (t *task) done() {
if t.verbose {
log.Debugf(t.context, "[task] %s done", t.name)
}
v.counter.Done()
v.TaskNum--
t.v.counter.Done()
t.v.TaskNum--
}

// Wait for all tasks done
Expand Down
4 changes: 2 additions & 2 deletions rpc/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

func TestCounter(t *testing.T) {
v := Vibranium{}
v.taskAdd(context.TODO(), "test", true)
task := v.newTask(context.TODO(), "test", true)
assert.Equal(t, v.TaskNum, 1)

v.taskDone(context.TODO(), "test", true)
task.done()
assert.Equal(t, v.TaskNum, 0)

v.Wait()
Expand Down
Loading

0 comments on commit 38b68cb

Please sign in to comment.