diff --git a/cluster/calcium/calcium_test.go b/cluster/calcium/calcium_test.go index 9bb021a5d..45f1b19b8 100644 --- a/cluster/calcium/calcium_test.go +++ b/cluster/calcium/calcium_test.go @@ -20,9 +20,9 @@ type dummyLock struct { } // Lock for lock -func (d *dummyLock) Lock(ctx context.Context) error { +func (d *dummyLock) Lock(ctx context.Context) (context.Context, error) { d.m.Lock() - return nil + return context.Background(), nil } // Unlock for unlock diff --git a/cluster/calcium/capacity.go b/cluster/calcium/capacity.go index 4ebb3765c..6fe6a13ba 100644 --- a/cluster/calcium/capacity.go +++ b/cluster/calcium/capacity.go @@ -18,7 +18,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio Total: 0, NodeCapacities: map[string]int{}, } - return msg, c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, nil, false, func(nodeMap map[string]*types.Node) error { + return msg, c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, nil, false, func(ctx context.Context, nodeMap map[string]*types.Node) error { if opts.DeployStrategy != strategy.Dummy { if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil { return errors.WithStack(err) diff --git a/cluster/calcium/capacity_test.go b/cluster/calcium/capacity_test.go index 6d5645c89..5cd011e97 100644 --- a/cluster/calcium/capacity_test.go +++ b/cluster/calcium/capacity_test.go @@ -33,7 +33,7 @@ func TestCalculateCapacity(t *testing.T) { } store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) // failed by wrong resource diff --git a/cluster/calcium/control.go b/cluster/calcium/control.go index e225a9e49..d2e520f78 100644 --- a/cluster/calcium/control.go +++ b/cluster/calcium/control.go @@ -23,7 +23,7 @@ func (c *Calcium) ControlWorkload(ctx context.Context, IDs []string, t string, f go func(ID string) { defer wg.Done() var message []*bytes.Buffer - err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error { + err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error { var err error switch t { case cluster.WorkloadStop: diff --git a/cluster/calcium/control_test.go b/cluster/calcium/control_test.go index d1360bf85..0d3206750 100644 --- a/cluster/calcium/control_test.go +++ b/cluster/calcium/control_test.go @@ -20,7 +20,7 @@ func TestControlStart(t *testing.T) { ctx := context.Background() store := &storemocks.Store{} lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) c.store = store store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) @@ -111,7 +111,7 @@ func TestControlStop(t *testing.T) { ctx := context.Background() store := &storemocks.Store{} lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) c.store = store store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) @@ -156,7 +156,7 @@ func TestControlRestart(t *testing.T) { ctx := context.Background() store := &storemocks.Store{} lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) c.store = store store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) diff --git a/cluster/calcium/copy.go b/cluster/calcium/copy.go index 04ef95fa6..7a41d3d04 100644 --- a/cluster/calcium/copy.go +++ b/cluster/calcium/copy.go @@ -23,7 +23,7 @@ func (c *Calcium) Copy(ctx context.Context, opts *types.CopyOptions) (chan *type wg.Add(1) go func(id string, paths []string) { defer wg.Done() - if err := c.withWorkloadLocked(ctx, id, func(workload *types.Workload) error { + if err := c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error { for _, path := range paths { resp, name, err := workload.Engine.VirtualizationCopyFrom(ctx, workload.ID, path) ch <- makeCopyMessage(id, name, path, err, resp) diff --git a/cluster/calcium/copy_test.go b/cluster/calcium/copy_test.go index f3f578576..17551f530 100644 --- a/cluster/calcium/copy_test.go +++ b/cluster/calcium/copy_test.go @@ -32,7 +32,7 @@ func TestCopy(t *testing.T) { } store := &storemocks.Store{} lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) c.store = store diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index d73e46296..ac8b6b31f 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -66,7 +66,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio // if: alloc resources func(ctx context.Context) error { - return c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, opts.NodeLabels, false, func(nodeMap map[string]*types.Node) (err error) { + return c.withNodesLocked(ctx, opts.Podname, opts.Nodenames, opts.NodeLabels, false, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) { defer func() { if err != nil { ch <- &types.CreateWorkloadMessage{Error: err} @@ -102,7 +102,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio // rollback: give back resources func(ctx context.Context, _ bool) (err error) { for nodename, rollbackIndices := range rollbackMap { - if e := c.withNodeLocked(ctx, nodename, func(node *types.Node) error { + if e := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, plan := range plans { plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint } diff --git a/cluster/calcium/create_test.go b/cluster/calcium/create_test.go index a48a787e8..0f8f392f3 100644 --- a/cluster/calcium/create_test.go +++ b/cluster/calcium/create_test.go @@ -122,13 +122,13 @@ func TestCreateWorkloadTxn(t *testing.T) { // doAllocResource fails: MakeDeployStatus lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.Background(), nil) lock.On("Unlock", mock.Anything).Return(nil) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nodes, nil) store.On("GetNode", - mock.AnythingOfType("*context.timerCtx"), + mock.AnythingOfType("*context.emptyCtx"), mock.AnythingOfType("string"), ).Return( func(_ context.Context, name string) (node *types.Node) { diff --git a/cluster/calcium/dissociate.go b/cluster/calcium/dissociate.go index 357087c31..b1bf07fde 100644 --- a/cluster/calcium/dissociate.go +++ b/cluster/calcium/dissociate.go @@ -15,8 +15,8 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, IDs []string) (chan *t go func() { defer close(ch) for _, ID := range IDs { - err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error { - return c.withNodeLocked(ctx, workload.Nodename, func(node *types.Node) (err error) { + err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error { + return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) { return utils.Txn( ctx, // if diff --git a/cluster/calcium/dissociate_test.go b/cluster/calcium/dissociate_test.go index 03d10007f..6b8908ec2 100644 --- a/cluster/calcium/dissociate_test.go +++ b/cluster/calcium/dissociate_test.go @@ -19,7 +19,7 @@ func TestDissociateWorkload(t *testing.T) { c.store = store lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) c1 := &types.Workload{ diff --git a/cluster/calcium/lock.go b/cluster/calcium/lock.go index a4d5a08f2..403d9f596 100644 --- a/cluster/calcium/lock.go +++ b/cluster/calcium/lock.go @@ -11,12 +11,13 @@ import ( "github.com/projecteru2/core/types" ) -func (c *Calcium) doLock(ctx context.Context, name string, timeout time.Duration) (lock.DistributedLock, error) { +func (c *Calcium) doLock(ctx context.Context, name string, timeout time.Duration) (lock.DistributedLock, context.Context, error) { lock, err := c.store.CreateLock(name, timeout) if err != nil { - return nil, err + return nil, nil, err } - return lock, lock.Lock(ctx) + ctx, err = lock.Lock(ctx) + return lock, ctx, err } func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg string) error { @@ -34,54 +35,56 @@ func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.Distrib } } -func (c *Calcium) withWorkloadLocked(ctx context.Context, ID string, f func(workload *types.Workload) error) error { - return c.withWorkloadsLocked(ctx, []string{ID}, func(workloads map[string]*types.Workload) error { +func (c *Calcium) withWorkloadLocked(ctx context.Context, ID string, f func(context.Context, *types.Workload) error) error { + return c.withWorkloadsLocked(ctx, []string{ID}, func(ctx context.Context, workloads map[string]*types.Workload) error { if c, ok := workloads[ID]; ok { - return f(c) + return f(ctx, c) } return types.ErrWorkloadNotExists }) } -func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(node *types.Node) error) error { - return c.withNodesLocked(ctx, "", []string{nodename}, nil, true, func(nodes map[string]*types.Node) error { +func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { + return c.withNodesLocked(ctx, "", []string{nodename}, nil, true, func(ctx context.Context, nodes map[string]*types.Node) error { if n, ok := nodes[nodename]; ok { - return f(n) + return f(ctx, n) } return types.ErrNodeNotExists }) } -func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(workloads map[string]*types.Workload) error) error { +func (c *Calcium) withWorkloadsLocked(ctx context.Context, IDs []string, f func(context.Context, map[string]*types.Workload) error) error { workloads := map[string]*types.Workload{} locks := map[string]lock.DistributedLock{} - defer func() { c.doUnlockAll(ctx, locks) }() + defer func() { c.doUnlockAll(context.Background(), locks) }() cs, err := c.GetWorkloads(ctx, IDs) if err != nil { return err } + var lock lock.DistributedLock for _, workload := range cs { - lock, err := c.doLock(ctx, fmt.Sprintf(cluster.WorkloadLock, workload.ID), c.config.LockTimeout) + lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.WorkloadLock, workload.ID), c.config.LockTimeout) if err != nil { return err } locks[workload.ID] = lock workloads[workload.ID] = workload } - return f(workloads) + return f(ctx, workloads) } -func (c *Calcium) withNodesLocked(ctx context.Context, podname string, nodenames []string, labels map[string]string, all bool, f func(nodes map[string]*types.Node) error) error { +func (c *Calcium) withNodesLocked(ctx context.Context, podname string, nodenames []string, labels map[string]string, all bool, f func(context.Context, map[string]*types.Node) error) error { nodes := map[string]*types.Node{} locks := map[string]lock.DistributedLock{} - defer c.doUnlockAll(ctx, locks) + defer c.doUnlockAll(context.Background(), locks) ns, err := c.getNodes(ctx, podname, nodenames, labels, all) if err != nil { return err } + var lock lock.DistributedLock for _, n := range ns { - lock, err := c.doLock(ctx, fmt.Sprintf(cluster.NodeLock, podname, n.Name), c.config.LockTimeout) + lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.NodeLock, podname, n.Name), c.config.LockTimeout) if err != nil { return err } @@ -94,5 +97,5 @@ func (c *Calcium) withNodesLocked(ctx context.Context, podname string, nodenames } nodes[n.Name] = node } - return f(nodes) + return f(ctx, nodes) } diff --git a/cluster/calcium/lock_test.go b/cluster/calcium/lock_test.go index eade54dc9..e6061dc5e 100644 --- a/cluster/calcium/lock_test.go +++ b/cluster/calcium/lock_test.go @@ -21,18 +21,18 @@ func TestDoLock(t *testing.T) { c.store = store // create lock failed store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - _, err := c.doLock(ctx, "somename", 1) + _, _, err := c.doLock(ctx, "somename", 1) assert.Error(t, err) lock := &lockmocks.DistributedLock{} store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) // lock failed - lock.On("Lock", mock.Anything).Return(types.ErrNoETCD).Once() - _, err = c.doLock(ctx, "somename", 1) + lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once() + _, _, err = c.doLock(ctx, "somename", 1) assert.Error(t, err) // success - lock.On("Lock", mock.Anything).Return(nil) - _, err = c.doLock(ctx, "somename", 1) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) + _, _, err = c.doLock(ctx, "somename", 1) assert.NoError(t, err) } @@ -57,15 +57,15 @@ func TestWithWorkloadsLocked(t *testing.T) { store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) lock.On("Unlock", mock.Anything).Return(nil) // failed to get lock - lock.On("Lock", mock.Anything).Return(types.ErrNoETCD).Once() + lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once() store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{{}}, nil).Once() - err := c.withWorkloadsLocked(ctx, []string{"c1", "c2"}, func(workloads map[string]*types.Workload) error { return nil }) + err := c.withWorkloadsLocked(ctx, []string{"c1", "c2"}, func(ctx context.Context, workloads map[string]*types.Workload) error { return nil }) assert.Error(t, err) // success - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by getworkload store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withWorkloadsLocked(ctx, []string{"c1", "c2"}, func(workloads map[string]*types.Workload) error { return nil }) + err = c.withWorkloadsLocked(ctx, []string{"c1", "c2"}, func(ctx context.Context, workloads map[string]*types.Workload) error { return nil }) assert.Error(t, err) engine := &enginemocks.API{} workload := &types.Workload{ @@ -74,7 +74,7 @@ func TestWithWorkloadsLocked(t *testing.T) { } store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil) // success - err = c.withWorkloadsLocked(ctx, []string{"c1", "c1"}, func(workloads map[string]*types.Workload) error { + err = c.withWorkloadsLocked(ctx, []string{"c1", "c1"}, func(ctx context.Context, workloads map[string]*types.Workload) error { assert.Len(t, workloads, 1) return nil }) @@ -91,15 +91,15 @@ func TestWithWorkloadLocked(t *testing.T) { store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) lock.On("Unlock", mock.Anything).Return(nil) // failed to get lock - lock.On("Lock", mock.Anything).Return(types.ErrNoETCD).Once() + lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once() store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{{}}, nil).Once() - err := c.withWorkloadLocked(ctx, "c1", func(workload *types.Workload) error { return nil }) + err := c.withWorkloadLocked(ctx, "c1", func(ctx context.Context, workload *types.Workload) error { return nil }) assert.Error(t, err) // success - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by getworkload store.On("GetWorkloads", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withWorkloadLocked(ctx, "c1", func(workload *types.Workload) error { return nil }) + err = c.withWorkloadLocked(ctx, "c1", func(ctx context.Context, workload *types.Workload) error { return nil }) assert.Error(t, err) engine := &enginemocks.API{} workload := &types.Workload{ @@ -108,7 +108,7 @@ func TestWithWorkloadLocked(t *testing.T) { } store.On("GetWorkloads", mock.Anything, mock.Anything).Return([]*types.Workload{workload}, nil) // success - err = c.withWorkloadLocked(ctx, "c1", func(workload *types.Workload) error { + err = c.withWorkloadLocked(ctx, "c1", func(ctx context.Context, workload *types.Workload) error { assert.Equal(t, workload.ID, "c1") return nil }) @@ -132,12 +132,12 @@ func TestWithNodesLocked(t *testing.T) { } // failed by list nodes store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once() - err := c.withNodesLocked(ctx, "test", nil, nil, false, func(nodes map[string]*types.Node) error { return nil }) + err := c.withNodesLocked(ctx, "test", nil, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once() // failed by filter var ns map[string]*types.Node - err = c.withNodesLocked(ctx, "test", nil, map[string]string{"eru": "2"}, false, func(nodes map[string]*types.Node) error { + err = c.withNodesLocked(ctx, "test", nil, map[string]string{"eru": "2"}, false, func(ctx context.Context, nodes map[string]*types.Node) error { ns = nodes return nil }) @@ -146,7 +146,7 @@ func TestWithNodesLocked(t *testing.T) { store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil) // failed by getnode store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(nodes map[string]*types.Node) error { return nil }) + err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once() // failed by lock @@ -154,17 +154,17 @@ func TestWithNodesLocked(t *testing.T) { store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) lock.On("Unlock", mock.Anything).Return(nil) // failed to get lock - lock.On("Lock", mock.Anything).Return(types.ErrNoETCD).Once() - err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(nodes map[string]*types.Node) error { return nil }) + lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once() + err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by get locked node store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(nodes map[string]*types.Node) error { return nil }) + err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) // success - err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(nodes map[string]*types.Node) error { + err = c.withNodesLocked(ctx, "test", []string{"test"}, nil, false, func(ctx context.Context, nodes map[string]*types.Node) error { assert.Len(t, nodes, 1) return nil }) @@ -190,14 +190,14 @@ func TestWithNodeLocked(t *testing.T) { lock := &lockmocks.DistributedLock{} store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) lock.On("Unlock", mock.Anything).Return(nil) - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by get locked node store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err := c.withNodeLocked(ctx, "test", func(node *types.Node) error { return nil }) + err := c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) // success - err = c.withNodeLocked(ctx, "test", func(node *types.Node) error { + err = c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { assert.Equal(t, node.Name, node1.Name) return nil }) diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index 9d7d077c6..4af1299bc 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -22,7 +22,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error { if nodename == "" { return types.ErrEmptyNodeName } - return c.withNodeLocked(ctx, nodename, func(node *types.Node) error { + return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { ws, err := c.ListNodeWorkloads(ctx, node.Name, nil) if err != nil { return err @@ -53,7 +53,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ return nil, err } var n *types.Node - return n, c.withNodeLocked(ctx, opts.Nodename, func(node *types.Node) error { + return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error { litter.Dump(opts) opts.Normalize(node) n = node diff --git a/cluster/calcium/node_test.go b/cluster/calcium/node_test.go index 03468f347..b97986a27 100644 --- a/cluster/calcium/node_test.go +++ b/cluster/calcium/node_test.go @@ -54,7 +54,7 @@ func TestRemoveNode(t *testing.T) { c.store = store lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) @@ -136,7 +136,7 @@ func TestSetNode(t *testing.T) { c.store = store lock := &lockmocks.DistributedLock{} store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) // fail by validating diff --git a/cluster/calcium/pod.go b/cluster/calcium/pod.go index 2ba7b045c..540c71a21 100644 --- a/cluster/calcium/pod.go +++ b/cluster/calcium/pod.go @@ -19,7 +19,7 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error { if podname == "" { return types.ErrEmptyPodName } - return c.withNodesLocked(ctx, podname, []string{}, nil, true, func(nodes map[string]*types.Node) error { + return c.withNodesLocked(ctx, podname, []string{}, nil, true, func(ctx context.Context, nodes map[string]*types.Node) error { // TODO dissociate workload to node // TODO should remove node first return c.store.RemovePod(ctx, podname) diff --git a/cluster/calcium/pod_test.go b/cluster/calcium/pod_test.go index c923c2285..5f5e18409 100644 --- a/cluster/calcium/pod_test.go +++ b/cluster/calcium/pod_test.go @@ -45,7 +45,7 @@ func TestRemovePod(t *testing.T) { store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{node}, nil) store.On("GetNode", mock.Anything, mock.Anything).Return(node, nil) lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) c.store = store diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index a3143cf9d..2ff3e3be8 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -14,7 +14,7 @@ import ( // ReallocResource updates workload resource dynamically func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) { - return c.withWorkloadLocked(ctx, opts.ID, func(workload *types.Workload) error { + return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error { rrs, err := resources.MakeRequests( types.ResourceOptions{ CPUQuotaRequest: workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest, @@ -38,7 +38,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption // transaction: node resource func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error { - return c.withNodeLocked(ctx, nodename, func(node *types.Node) error { + return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { node.RecycleResources(&workload.ResourceMeta) _, total, plans, err := resources.SelectNodesByResourceRequests(rrs, map[string]*types.Node{node.Name: node}) if err != nil { diff --git a/cluster/calcium/realloc_test.go b/cluster/calcium/realloc_test.go index b17036f13..0861ef045 100644 --- a/cluster/calcium/realloc_test.go +++ b/cluster/calcium/realloc_test.go @@ -38,7 +38,7 @@ func TestRealloc(t *testing.T) { c.config.Scheduler.ShareBase = 100 lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) engine := &enginemocks.API{} @@ -283,7 +283,7 @@ func TestReallocBindCpu(t *testing.T) { Name: "p1", } lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index 8248edc5a..1994e3398 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -29,8 +29,8 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, IDs []string, force bool, go func(ID string) { defer wg.Done() ret := &types.RemoveWorkloadMessage{WorkloadID: ID, Success: false, Hook: []*bytes.Buffer{}} - if err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error { - return c.withNodeLocked(ctx, workload.Nodename, func(node *types.Node) (err error) { + if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error { + return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) (err error) { return utils.Txn( ctx, // if diff --git a/cluster/calcium/remove_test.go b/cluster/calcium/remove_test.go index 20e0580c2..4bdf8233e 100644 --- a/cluster/calcium/remove_test.go +++ b/cluster/calcium/remove_test.go @@ -16,7 +16,7 @@ func TestRemoveWorkload(t *testing.T) { c := NewTestCluster() ctx := context.Background() lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store := c.store.(*storemocks.Store) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) diff --git a/cluster/calcium/replace.go b/cluster/calcium/replace.go index f0081ab30..bdf1e51d5 100644 --- a/cluster/calcium/replace.go +++ b/cluster/calcium/replace.go @@ -48,7 +48,7 @@ func (c *Calcium) ReplaceWorkload(ctx context.Context, opts *types.ReplaceOption var createMessage *types.CreateWorkloadMessage removeMessage := &types.RemoveWorkloadMessage{WorkloadID: id} var err error - if err = c.withWorkloadLocked(ctx, id, func(workload *types.Workload) error { + if err = c.withWorkloadLocked(ctx, id, func(ctx context.Context, workload *types.Workload) error { if opts.Podname != "" && workload.Podname != opts.Podname { log.Warnf("[ReplaceWorkload] Skip not in pod workload %s", workload.ID) return types.NewDetailedErr(types.ErrIgnoreWorkload, diff --git a/cluster/calcium/replace_test.go b/cluster/calcium/replace_test.go index 915498133..1b46a2cc4 100644 --- a/cluster/calcium/replace_test.go +++ b/cluster/calcium/replace_test.go @@ -19,7 +19,7 @@ func TestReplaceWorkload(t *testing.T) { c := NewTestCluster() ctx := context.Background() lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store := c.store.(*storemocks.Store) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) diff --git a/cluster/calcium/resource.go b/cluster/calcium/resource.go index 68d2fa024..677a148ba 100644 --- a/cluster/calcium/resource.go +++ b/cluster/calcium/resource.go @@ -54,7 +54,7 @@ func (c *Calcium) NodeResource(ctx context.Context, nodename string, fix bool) ( func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error) { var nr *types.NodeResource - return nr, c.withNodeLocked(ctx, nodename, func(node *types.Node) error { + return nr, c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { workloads, err := c.ListNodeWorkloads(ctx, node.Name, nil) if err != nil { return err diff --git a/cluster/calcium/resource_test.go b/cluster/calcium/resource_test.go index a8cc820b5..8687312f3 100644 --- a/cluster/calcium/resource_test.go +++ b/cluster/calcium/resource_test.go @@ -30,7 +30,7 @@ func TestPodResource(t *testing.T) { c.store = store lock := &lockmocks.DistributedLock{} store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) // failed by GetNodesByPod store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() @@ -97,7 +97,7 @@ func TestNodeResource(t *testing.T) { c.store = store lock := &lockmocks.DistributedLock{} store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) node := &types.Node{ NodeMeta: types.NodeMeta{ diff --git a/cluster/calcium/send.go b/cluster/calcium/send.go index ff747aeca..c00ce4f09 100644 --- a/cluster/calcium/send.go +++ b/cluster/calcium/send.go @@ -26,7 +26,7 @@ func (c *Calcium) Send(ctx context.Context, opts *types.SendOptions) (chan *type wg.Add(1) go func(ID string) { defer wg.Done() - if err := c.withWorkloadLocked(ctx, ID, func(workload *types.Workload) error { + if err := c.withWorkloadLocked(ctx, ID, func(ctx context.Context, workload *types.Workload) error { for dst, content := range opts.Data { err := c.doSendFileToWorkload(ctx, workload.Engine, workload.ID, dst, bytes.NewBuffer(content), true, true) ch <- &types.SendMessage{ID: ID, Path: dst, Error: err} diff --git a/cluster/calcium/send_test.go b/cluster/calcium/send_test.go index de6a80911..57be7d570 100644 --- a/cluster/calcium/send_test.go +++ b/cluster/calcium/send_test.go @@ -36,7 +36,7 @@ func TestSend(t *testing.T) { store := &storemocks.Store{} c.store = store lock := &lockmocks.DistributedLock{} - lock.On("Lock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) lock.On("Unlock", mock.Anything).Return(nil) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) // failed by GetWorkload diff --git a/engine/docker/mocks/APIClient.go b/engine/docker/mocks/APIClient.go index 1af641ee7..1b4642266 100644 --- a/engine/docker/mocks/APIClient.go +++ b/engine/docker/mocks/APIClient.go @@ -1,22 +1,38 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v1.1.2. DO NOT EDIT. package mocks -import container "github.com/docker/docker/api/types/container" -import context "context" -import events "github.com/docker/docker/api/types/events" -import filters "github.com/docker/docker/api/types/filters" -import http "net/http" -import image "github.com/docker/docker/api/types/image" -import io "io" -import mock "github.com/stretchr/testify/mock" -import net "net" -import network "github.com/docker/docker/api/types/network" -import registry "github.com/docker/docker/api/types/registry" -import swarm "github.com/docker/docker/api/types/swarm" -import time "time" -import types "github.com/docker/docker/api/types" -import volume "github.com/docker/docker/api/types/volume" +import ( + context "context" + + container "github.com/docker/docker/api/types/container" + + events "github.com/docker/docker/api/types/events" + + filters "github.com/docker/docker/api/types/filters" + + http "net/http" + + image "github.com/docker/docker/api/types/image" + + io "io" + + mock "github.com/stretchr/testify/mock" + + net "net" + + network "github.com/docker/docker/api/types/network" + + registry "github.com/docker/docker/api/types/registry" + + swarm "github.com/docker/docker/api/types/swarm" + + time "time" + + types "github.com/docker/docker/api/types" + + volume "github.com/docker/docker/api/types/volume" +) // APIClient is an autogenerated mock type for the APIClient type type APIClient struct { diff --git a/lock/etcdlock/mutex.go b/lock/etcdlock/mutex.go index 95c214dcb..543bf90c2 100644 --- a/lock/etcdlock/mutex.go +++ b/lock/etcdlock/mutex.go @@ -3,6 +3,7 @@ package etcdlock import ( "fmt" "strings" + "sync" "time" "github.com/projecteru2/core/types" @@ -18,6 +19,27 @@ type Mutex struct { session *concurrency.Session } +type lockContext struct { + err error + mutex sync.Mutex + context.Context +} + +func (c *lockContext) setError(err error) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.err = err +} + +func (c *lockContext) Err() error { + c.mutex.Lock() + defer c.mutex.Unlock() + if c.err != nil { + return c.err + } + return c.Context.Err() +} + // New new a lock func New(cli *clientv3.Client, key string, ttl time.Duration) (*Mutex, error) { if key == "" { @@ -39,17 +61,45 @@ func New(cli *clientv3.Client, key string, ttl time.Duration) (*Mutex, error) { } // Lock get locked -func (m *Mutex) Lock(ctx context.Context) error { +func (m *Mutex) Lock(ctx context.Context) (context.Context, error) { lockCtx, cancel := context.WithTimeout(ctx, m.timeout) defer cancel() - return m.mutex.Lock(lockCtx) + + if err := m.mutex.Lock(lockCtx); err != nil { + return nil, err + } + + ctx, cancel = context.WithCancel(ctx) + rCtx := &lockContext{Context: ctx} + + go func() { + defer cancel() + + select { + case <-m.session.Done(): + rCtx.setError(types.ErrLockSessionDone) + case <-ctx.Done(): + } + }() + + return rCtx, nil } // Unlock unlock func (m *Mutex) Unlock(ctx context.Context) error { defer m.session.Close() - // 一定要释放 - lockCtx, cancel := context.WithTimeout(context.Background(), m.timeout) + // release resource + + lockCtx, cancel := context.WithTimeout(ctx, m.timeout) defer cancel() - return m.mutex.Unlock(lockCtx) + return m.unlock(lockCtx) +} + +func (m *Mutex) unlock(ctx context.Context) error { + _, err := m.session.Client().Txn(ctx).If(m.mutex.IsOwner()). + Then(clientv3.OpDelete(m.mutex.Key())).Commit() + // no way to clear it... + // m.myKey = "\x00" + // m.myRev = -1 + return err } diff --git a/lock/etcdlock/mutex_test.go b/lock/etcdlock/mutex_test.go index 1eb6d12a6..adc563429 100644 --- a/lock/etcdlock/mutex_test.go +++ b/lock/etcdlock/mutex_test.go @@ -21,8 +21,10 @@ func TestMutex(t *testing.T) { assert.NoError(t, err) ctx := context.Background() - err = mutex.Lock(ctx) + ctx, err = mutex.Lock(ctx) + assert.Nil(t, ctx.Err()) assert.NoError(t, err) err = mutex.Unlock(ctx) assert.NoError(t, err) + assert.EqualError(t, ctx.Err(), "lock session done") } diff --git a/lock/lock.go b/lock/lock.go index 16852220f..4cf8d9933 100644 --- a/lock/lock.go +++ b/lock/lock.go @@ -4,6 +4,6 @@ import "context" // DistributedLock is a lock based on something type DistributedLock interface { - Lock(ctx context.Context) error + Lock(ctx context.Context) (context.Context, error) Unlock(ctx context.Context) error } diff --git a/lock/mocks/DistributedLock.go b/lock/mocks/DistributedLock.go index d63737fef..f4b694b99 100644 --- a/lock/mocks/DistributedLock.go +++ b/lock/mocks/DistributedLock.go @@ -1,10 +1,12 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.0.0-alpha.2. DO NOT EDIT. package mocks -import context "context" +import ( + context "context" -import mock "github.com/stretchr/testify/mock" + mock "github.com/stretchr/testify/mock" +) // DistributedLock is an autogenerated mock type for the DistributedLock type type DistributedLock struct { @@ -12,17 +14,26 @@ type DistributedLock struct { } // Lock provides a mock function with given fields: ctx -func (_m *DistributedLock) Lock(ctx context.Context) error { +func (_m *DistributedLock) Lock(ctx context.Context) (context.Context, error) { ret := _m.Called(ctx) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { + var r0 context.Context + if rf, ok := ret.Get(0).(func(context.Context) context.Context); ok { r0 = rf(ctx) } else { - r0 = ret.Error(0) + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } } - return r0 + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // Unlock provides a mock function with given fields: ctx diff --git a/types/errors.go b/types/errors.go index 47dcf8b3e..cefe2ec8e 100644 --- a/types/errors.go +++ b/types/errors.go @@ -83,6 +83,7 @@ var ( ErrUnregisteredWALEventType = errors.New("unregistered WAL event type") ErrInvalidWALBucket = errors.New("invalid WAL bucket") + ErrLockSessionDone = errors.New("lock session done") ) // NewDetailedErr returns an error with details