diff --git a/cluster/calcium/capacity.go b/cluster/calcium/capacity.go index fbae699eb..c686103fc 100644 --- a/cluster/calcium/capacity.go +++ b/cluster/calcium/capacity.go @@ -21,7 +21,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio NodeCapacities: map[string]int{}, } - return msg, c.withNodesLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error { + return msg, c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error { if opts.DeployStrategy != strategy.Dummy { if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil { logger.Errorf(ctx, "[Calcium.CalculateCapacity] doAllocResource failed: %+v", err) diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index 66c1afa4a..111b8b78c 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -95,7 +95,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio ch <- &types.CreateWorkloadMessage{Error: logger.Err(ctx, err)} } }() - return c.withNodesLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) { + return c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) { // calculate plans if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil { return err @@ -136,7 +136,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio return } for nodename, rollbackIndices := range rollbackMap { - if e := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + if e := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, plan := range plans { plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint } @@ -249,12 +249,8 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr // remap 就不搞进事务了吧, 回滚代价太大了 // 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的 // 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态 - if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { - c.doRemapResourceAndLog(ctx, logger, node) - return nil - }); err != nil { - logger.Errorf(ctx, "failed to lock node to remap: %v", err) - } + go c.doRemapResourceAndLog(ctx, logger, node) + return indices, err } diff --git a/cluster/calcium/dissociate.go b/cluster/calcium/dissociate.go index 9f954f269..be16bf066 100644 --- a/cluster/calcium/dissociate.go +++ b/cluster/calcium/dissociate.go @@ -25,7 +25,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t defer close(ch) for nodename, workloadIDs := range nodeWorkloadGroup { - if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, workloadID := range workloadIDs { // nolint:scopelint msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID} // nolint:scopelint if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error { @@ -57,7 +57,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t } ch <- msg } - c.doRemapResourceAndLog(ctx, logger, node) + go c.doRemapResourceAndLog(ctx, logger, node) return nil }); err != nil { logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err) diff --git a/cluster/calcium/dissociate_test.go b/cluster/calcium/dissociate_test.go index 884a50c99..7742f61b5 100644 --- a/cluster/calcium/dissociate_test.go +++ b/cluster/calcium/dissociate_test.go @@ -3,6 +3,7 @@ package calcium import ( "context" "testing" + "time" lockmocks "github.com/projecteru2/core/lock/mocks" storemocks "github.com/projecteru2/core/store/mocks" @@ -66,9 +67,11 @@ func TestDissociateWorkload(t *testing.T) { for r := range ch { assert.Error(t, r.Error) } + time.Sleep(time.Second) store.AssertExpectations(t) store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil) + // success ch, err = c.DissociateWorkload(ctx, []string{"c1"}) assert.NoError(t, err) diff --git a/cluster/calcium/lock.go b/cluster/calcium/lock.go index edbf4fcd9..f1fb7228d 100644 --- a/cluster/calcium/lock.go +++ b/cluster/calcium/lock.go @@ -65,12 +65,20 @@ func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(cont }) } -func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { +func (c *Calcium) withNodeResourceLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { + return c.withNodeLocked(ctx, nodename, cluster.NodeResourceLock, f) +} + +func (c *Calcium) withNodeOperationLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { + return c.withNodeLocked(ctx, nodename, cluster.NodeOperationLock, f) +} + +func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, lockKeyPattern string, f func(context.Context, *types.Node) error) error { nf := types.NodeFilter{ Includes: []string{nodename}, All: true, } - return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error { + return c.withNodesLocked(ctx, nf, lockKeyPattern, func(ctx context.Context, nodes map[string]*types.Node) error { if n, ok := nodes[nodename]; ok { return f(ctx, n) } @@ -108,9 +116,17 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func( return f(ctx, workloads) } +func (c *Calcium) withNodesOperationLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { // nolint + return c.withNodesLocked(ctx, nf, cluster.NodeOperationLock, f) +} + +func (c *Calcium) withNodesResourceLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { + return c.withNodesLocked(ctx, nf, cluster.NodeResourceLock, f) +} + // withNodesLocked will using NodeFilter `nf` to filter nodes // and lock the corresponding nodes for the callback function `f` to use -func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { +func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, lockKeyPattern string, f func(context.Context, map[string]*types.Node) error) error { nodenames := []string{} nodes := map[string]*types.Node{} locks := map[string]lock.DistributedLock{} @@ -127,7 +143,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu var lock lock.DistributedLock for _, n := range ns { - lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.NodeLock, n.Podname, n.Name), c.config.LockTimeout) + lock, ctx, err = c.doLock(ctx, fmt.Sprintf(lockKeyPattern, n.Podname, n.Name), c.config.LockTimeout) if err != nil { return err } diff --git a/cluster/calcium/lock_test.go b/cluster/calcium/lock_test.go index 702d10a42..80cdd21cf 100644 --- a/cluster/calcium/lock_test.go +++ b/cluster/calcium/lock_test.go @@ -116,7 +116,7 @@ func TestWithWorkloadLocked(t *testing.T) { assert.NoError(t, err) } -func TestWithNodesLocked(t *testing.T) { +func TestWithNodesResourceLocked(t *testing.T) { c := NewTestCluster() ctx := context.Background() store := &storemocks.Store{} @@ -133,12 +133,12 @@ func TestWithNodesLocked(t *testing.T) { } // failed by list nodes store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once() - err := c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err := c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once() // failed by filter var ns map[string]*types.Node - err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { + err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { ns = nodes return nil }) @@ -147,7 +147,7 @@ func TestWithNodesLocked(t *testing.T) { store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil) // failed by getnode store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once() // failed by lock @@ -156,23 +156,23 @@ func TestWithNodesLocked(t *testing.T) { lock.On("Unlock", mock.Anything).Return(nil) // failed to get lock lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once() - err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by get locked node store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) // success - err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { + err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { assert.Len(t, nodes, 1) return nil }) assert.NoError(t, err) } -func TestWithNodeLocked(t *testing.T) { +func TestWithNodeResourceLocked(t *testing.T) { c := NewTestCluster() ctx := context.Background() store := &storemocks.Store{} @@ -194,13 +194,110 @@ func TestWithNodeLocked(t *testing.T) { lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by get locked node store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err := c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil }) + err := c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) // success - err = c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { + err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { assert.Equal(t, node.Name, node1.Name) return nil }) assert.NoError(t, err) } + +func TestWithNodesOperationLocked(t *testing.T) { + c := NewTestCluster() + ctx := context.Background() + store := &storemocks.Store{} + c.store = store + + node1 := &types.Node{ + NodeMeta: types.NodeMeta{ + Name: "test", + Labels: map[string]string{ + "eru": "1", + }, + }, + Available: true, + } + // failed by list nodes + store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once() + err := c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + assert.Error(t, err) + store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once() + // failed by filter + var ns map[string]*types.Node + err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { + ns = nodes + return nil + }) + assert.NoError(t, err) + assert.Empty(t, ns) + store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil) + // failed by getnode + store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() + err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + assert.Error(t, err) + store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once() + // failed by lock + lock := &lockmocks.DistributedLock{} + store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) + lock.On("Unlock", mock.Anything).Return(nil) + // failed to get lock + lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once() + err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + assert.Error(t, err) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) + // failed by get locked node + store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() + err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + assert.Error(t, err) + store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) + // success + err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { + assert.Len(t, nodes, 1) + return nil + }) + assert.NoError(t, err) +} + +func TestWithNodeOperationLocked(t *testing.T) { + c := NewTestCluster() + ctx := context.Background() + store := &storemocks.Store{} + c.store = store + + node1 := &types.Node{ + NodeMeta: types.NodeMeta{ + Name: "test", + Labels: map[string]string{ + "eru": "1", + }, + }, + Available: true, + } + // failed by lock + lock := &lockmocks.DistributedLock{} + store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) + lock.On("Unlock", mock.Anything).Return(nil) + lock.On("Lock", mock.Anything).Return(context.TODO(), nil) + // failed by get locked node + store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() + err := c.withNodeOperationLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil }) + assert.Error(t, err) + store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) + // success + err = c.withNodeOperationLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { + assert.Equal(t, node.Name, node1.Name) + return nil + }) + assert.NoError(t, err) + + err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { + return c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error { + assert.Equal(t, node.Name, node1.Name) + return nil + }) + }) + assert.NoError(t, err) +} diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index d49f5990a..c33a05c7b 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -28,7 +28,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error { if nodename == "" { return logger.Err(ctx, errors.WithStack(types.ErrEmptyNodeName)) } - return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + return c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { ws, err := c.ListNodeWorkloads(ctx, node.Name, nil) if err != nil { return logger.Err(ctx, err) @@ -91,7 +91,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ return nil, logger.Err(ctx, err) } var n *types.Node - return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error { + return n, c.withNodeResourceLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error { logger.Infof(ctx, "set node") opts.Normalize(node) n = node diff --git a/cluster/calcium/pod.go b/cluster/calcium/pod.go index 82a577b74..46e98a333 100644 --- a/cluster/calcium/pod.go +++ b/cluster/calcium/pod.go @@ -30,7 +30,7 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error { Podname: podname, All: true, } - return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error { + return c.withNodesResourceLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error { // TODO dissociate workload to node // TODO should remove node first return logger.Err(ctx, errors.WithStack(c.store.RemovePod(ctx, podname))) diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 0cfb014d6..f8fb8510d 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -21,7 +21,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption if err != nil { return } - return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error { + return c.withNodeResourceLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error { return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error { rrs, err := resources.MakeRequests( @@ -108,7 +108,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa return } - c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node) + go c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node) return nil } diff --git a/cluster/calcium/realloc_test.go b/cluster/calcium/realloc_test.go index f9d592f2b..278f26bc0 100644 --- a/cluster/calcium/realloc_test.go +++ b/cluster/calcium/realloc_test.go @@ -3,6 +3,7 @@ package calcium import ( "context" "testing" + "time" enginemocks "github.com/projecteru2/core/engine/mocks" enginetypes "github.com/projecteru2/core/engine/types" @@ -362,6 +363,7 @@ func TestReallocBindCpu(t *testing.T) { err = c.ReallocResource(ctx, newReallocOptions("c5", 0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep)) assert.NoError(t, err) assert.Equal(t, 0, len(c5.ResourceMeta.CPU)) + time.Sleep(time.Second) store.AssertExpectations(t) store.On("GetWorkload", mock.Anything, "c6").Return(c6, nil) diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index 5bb4fba76..b73e4ae55 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -33,7 +33,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, utils.SentryGo(func(nodename string, workloadIDs []string) func() { return func() { defer wg.Done() - if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, workloadID := range workloadIDs { ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}} if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error { @@ -66,7 +66,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool, } ch <- ret } - c.doRemapResourceAndLog(ctx, logger, node) + go c.doRemapResourceAndLog(ctx, logger, node) return nil }); err != nil { logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err) diff --git a/cluster/calcium/replace.go b/cluster/calcium/replace.go index 4d56bc359..a2700bc3f 100644 --- a/cluster/calcium/replace.go +++ b/cluster/calcium/replace.go @@ -206,12 +206,7 @@ func (c *Calcium) doReplaceWorkload( return createMessage, removeMessage, err } - if err := c.withNodeLocked(ctx, node.Name, func(_ context.Context, node *types.Node) error { - c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReplaceWorkload"), node) - return nil - }); err != nil { - log.Errorf(ctx, "[replaceAndRemove] failed to lock node to remap: %v", err) - } + go c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReplaceWorkload"), node) return createMessage, removeMessage, err } diff --git a/cluster/calcium/resource.go b/cluster/calcium/resource.go index a10423297..a2ec67f28 100644 --- a/cluster/calcium/resource.go +++ b/cluster/calcium/resource.go @@ -64,7 +64,7 @@ func (c *Calcium) NodeResource(ctx context.Context, nodename string, fix bool) ( func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error) { var nr *types.NodeResource - return nr, c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + return nr, c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { workloads, err := c.ListNodeWorkloads(ctx, node.Name, nil) if err != nil { return err @@ -237,10 +237,18 @@ func (c *Calcium) doRemapResourceAndLog(ctx context.Context, logger log.Fields, log.Debugf(ctx, "[doRemapResourceAndLog] remap node %s", node.Name) ctx, cancel := context.WithTimeout(utils.InheritTracingInfo(ctx, context.TODO()), c.config.GlobalTimeout) defer cancel() - logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name) - if ch, err := c.remapResource(ctx, node); logger.Err(ctx, err) == nil { - for msg := range ch { - logger.WithField("id", msg.ID).Err(ctx, msg.Error) // nolint:errcheck + + err := c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error { + logger = logger.WithField("Calcium", "doRemapResourceAndLog").WithField("nodename", node.Name) + if ch, err := c.remapResource(ctx, node); logger.Err(ctx, err) == nil { + for msg := range ch { + logger.WithField("id", msg.ID).Err(ctx, msg.Error) // nolint:errcheck + } } + return nil + }) + + if err != nil { + log.Errorf(ctx, "[doRemapResourceAndLog] remap node %s failed, err: %v", node.Name, err) } } diff --git a/cluster/cluster.go b/cluster/cluster.go index 2d9a8b108..2d2e6d1e8 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -30,8 +30,10 @@ const ( WorkloadRestart = "restart" // WorkloadLock for lock workload WorkloadLock = "clock_%s" - // NodeLock for lock node - NodeLock = "cnode_%s_%s" + // NodeResourceLock for lock node resource + NodeResourceLock = "cnode_%s_%s" + // NodeOperationLock for lock node operation + NodeOperationLock = "cnode_op_%s_%s" ) // Cluster define all interface