diff --git a/cluster/calcium/capacity.go b/cluster/calcium/capacity.go index c686103fc..d7ef747d3 100644 --- a/cluster/calcium/capacity.go +++ b/cluster/calcium/capacity.go @@ -21,7 +21,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio NodeCapacities: map[string]int{}, } - return msg, c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error { + return msg, c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error { if opts.DeployStrategy != strategy.Dummy { if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil { logger.Errorf(ctx, "[Calcium.CalculateCapacity] doAllocResource failed: %+v", err) diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index ec5bc3aca..5b66488fb 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -97,7 +97,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio ch <- &types.CreateWorkloadMessage{Error: logger.Err(ctx, err)} } }() - return c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) { + return c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) { // calculate plans if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil { return err @@ -138,7 +138,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio return } for nodename, rollbackIndices := range rollbackMap { - if e := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + if e := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, plan := range plans { plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint } diff --git a/cluster/calcium/dissociate.go b/cluster/calcium/dissociate.go index be16bf066..4e3abef07 100644 --- a/cluster/calcium/dissociate.go +++ b/cluster/calcium/dissociate.go @@ -25,7 +25,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t defer close(ch) for nodename, workloadIDs := range nodeWorkloadGroup { - if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + if err := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, workloadID := range workloadIDs { // nolint:scopelint msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID} // nolint:scopelint if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error { diff --git a/cluster/calcium/lock.go b/cluster/calcium/lock.go index f1fb7228d..d64b9507f 100644 --- a/cluster/calcium/lock.go +++ b/cluster/calcium/lock.go @@ -31,7 +31,6 @@ func (c *Calcium) doLock(ctx context.Context, name string, timeout time.Duration }() rCtx, err = lock.Lock(ctx) return lock, rCtx, errors.WithStack(err) - } func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg string) error { @@ -65,27 +64,6 @@ func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(cont }) } -func (c *Calcium) withNodeResourceLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { - return c.withNodeLocked(ctx, nodename, cluster.NodeResourceLock, f) -} - -func (c *Calcium) withNodeOperationLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { - return c.withNodeLocked(ctx, nodename, cluster.NodeOperationLock, f) -} - -func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, lockKeyPattern string, f func(context.Context, *types.Node) error) error { - nf := types.NodeFilter{ - Includes: []string{nodename}, - All: true, - } - return c.withNodesLocked(ctx, nf, lockKeyPattern, func(ctx context.Context, nodes map[string]*types.Node) error { - if n, ok := nodes[nodename]; ok { - return f(ctx, n) - } - return errors.WithStack(types.ErrNodeNotExists) - }) -} - func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error { workloads := map[string]*types.Workload{} locks := map[string]lock.DistributedLock{} @@ -116,24 +94,54 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func( return f(ctx, workloads) } +func (c *Calcium) withNodePodLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { + nf := types.NodeFilter{ + Includes: []string{nodename}, + All: true, + } + return c.withNodesPodLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error { + if n, ok := nodes[nodename]; ok { + return f(ctx, n) + } + return errors.WithStack(types.ErrNodeNotExists) + }) +} + +func (c *Calcium) withNodeOperationLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error { + nf := types.NodeFilter{ + Includes: []string{nodename}, + All: true, + } + return c.withNodesOperationLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error { + if n, ok := nodes[nodename]; ok { + return f(ctx, n) + } + return errors.WithStack(types.ErrNodeNotExists) + }) +} + func (c *Calcium) withNodesOperationLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { // nolint - return c.withNodesLocked(ctx, nf, cluster.NodeOperationLock, f) + genKey := func(node *types.Node) string { + return fmt.Sprintf(cluster.NodeOperationLock, node.Podname, node.Name) + } + return c.withNodesLocked(ctx, nf, genKey, f) } -func (c *Calcium) withNodesResourceLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { - return c.withNodesLocked(ctx, nf, cluster.NodeResourceLock, f) +func (c *Calcium) withNodesPodLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { + genKey := func(node *types.Node) string { + return fmt.Sprintf(cluster.PodLock, node.Podname) + } + return c.withNodesLocked(ctx, nf, genKey, f) } -// withNodesLocked will using NodeFilter `nf` to filter nodes -// and lock the corresponding nodes for the callback function `f` to use -func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, lockKeyPattern string, f func(context.Context, map[string]*types.Node) error) error { - nodenames := []string{} +func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, genKey func(*types.Node) string, f func(context.Context, map[string]*types.Node) error) error { nodes := map[string]*types.Node{} locks := map[string]lock.DistributedLock{} - defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf) + lockKeys := []string{} defer func() { - utils.Reverse(nodenames) - c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, nodenames...) + utils.Reverse(lockKeys) + c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, lockKeys...) + log.Debugf(ctx, "[withNodesLocked] keys %v unlocked", lockKeys) }() ns, err := c.filterNodes(ctx, nf) @@ -143,19 +151,23 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, lock var lock lock.DistributedLock for _, n := range ns { - lock, ctx, err = c.doLock(ctx, fmt.Sprintf(lockKeyPattern, n.Podname, n.Name), c.config.LockTimeout) - if err != nil { - return err + key := genKey(n) + if _, ok := locks[key]; !ok { + lock, ctx, err = c.doLock(ctx, key, c.config.LockTimeout) + if err != nil { + return err + } + log.Debugf(ctx, "[withNodesLocked] key %s locked", key) + locks[key] = lock + lockKeys = append(lockKeys, key) } - log.Debugf(ctx, "[withNodesLocked] Node %s locked", n.Name) - locks[n.Name] = lock + // refresh node node, err := c.GetNode(ctx, n.Name) if err != nil { return err } nodes[n.Name] = node - nodenames = append(nodenames, n.Name) } return f(ctx, nodes) } diff --git a/cluster/calcium/lock_test.go b/cluster/calcium/lock_test.go index 80cdd21cf..82afc3221 100644 --- a/cluster/calcium/lock_test.go +++ b/cluster/calcium/lock_test.go @@ -116,7 +116,7 @@ func TestWithWorkloadLocked(t *testing.T) { assert.NoError(t, err) } -func TestWithNodesResourceLocked(t *testing.T) { +func TestWithNodesPodLocked(t *testing.T) { c := NewTestCluster() ctx := context.Background() store := &storemocks.Store{} @@ -128,17 +128,18 @@ func TestWithNodesResourceLocked(t *testing.T) { Labels: map[string]string{ "eru": "1", }, + Podname: "test", }, Available: true, } // failed by list nodes store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once() - err := c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err := c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once() // failed by filter var ns map[string]*types.Node - err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { + err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { ns = nodes return nil }) @@ -147,7 +148,7 @@ func TestWithNodesResourceLocked(t *testing.T) { store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil) // failed by getnode store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once() // failed by lock @@ -156,23 +157,23 @@ func TestWithNodesResourceLocked(t *testing.T) { lock.On("Unlock", mock.Anything).Return(nil) // failed to get lock lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once() - err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by get locked node store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) + err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) // success - err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { + err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { assert.Len(t, nodes, 1) return nil }) assert.NoError(t, err) } -func TestWithNodeResourceLocked(t *testing.T) { +func TestWithNodePodLocked(t *testing.T) { c := NewTestCluster() ctx := context.Background() store := &storemocks.Store{} @@ -184,6 +185,7 @@ func TestWithNodeResourceLocked(t *testing.T) { Labels: map[string]string{ "eru": "1", }, + Podname: "test", }, Available: true, } @@ -194,11 +196,11 @@ func TestWithNodeResourceLocked(t *testing.T) { lock.On("Lock", mock.Anything).Return(context.TODO(), nil) // failed by get locked node store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err := c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil }) + err := c.withNodePodLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil }) assert.Error(t, err) store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil) // success - err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { + err = c.withNodePodLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { assert.Equal(t, node.Name, node1.Name) return nil }) @@ -293,7 +295,7 @@ func TestWithNodeOperationLocked(t *testing.T) { }) assert.NoError(t, err) - err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { + err = c.withNodePodLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error { assert.Equal(t, node.Name, node1.Name) return nil diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index f45b65246..3dd096a9c 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -30,7 +30,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error { if nodename == "" { return logger.Err(ctx, errors.WithStack(types.ErrEmptyNodeName)) } - return c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + return c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { ws, err := c.ListNodeWorkloads(ctx, node.Name, nil) if err != nil { return logger.Err(ctx, err) @@ -107,7 +107,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ return nil, logger.Err(ctx, err) } var n *types.Node - return n, c.withNodeResourceLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error { + return n, c.withNodePodLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error { logger.Infof(ctx, "set node") opts.Normalize(node) n = node diff --git a/cluster/calcium/pod.go b/cluster/calcium/pod.go index 46e98a333..677deff2e 100644 --- a/cluster/calcium/pod.go +++ b/cluster/calcium/pod.go @@ -30,7 +30,7 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error { Podname: podname, All: true, } - return c.withNodesResourceLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error { + return c.withNodesPodLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error { // TODO dissociate workload to node // TODO should remove node first return logger.Err(ctx, errors.WithStack(c.store.RemovePod(ctx, podname))) diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index f8fb8510d..963ef5b3b 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -21,7 +21,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption if err != nil { return } - return c.withNodeResourceLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error { + return c.withNodePodLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error { return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error { rrs, err := resources.MakeRequests( diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index d40a5f118..2349c8817 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -33,7 +33,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool) utils.SentryGo(func(nodename string, workloadIDs []string) func() { return func() { defer wg.Done() - if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + if err := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { for _, workloadID := range workloadIDs { ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}} if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error { diff --git a/cluster/calcium/resource.go b/cluster/calcium/resource.go index a2ec67f28..3bb0f055e 100644 --- a/cluster/calcium/resource.go +++ b/cluster/calcium/resource.go @@ -64,7 +64,7 @@ func (c *Calcium) NodeResource(ctx context.Context, nodename string, fix bool) ( func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error) { var nr *types.NodeResource - return nr, c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { + return nr, c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error { workloads, err := c.ListNodeWorkloads(ctx, node.Name, nil) if err != nil { return err diff --git a/cluster/cluster.go b/cluster/cluster.go index e25aec5cc..8ace051e8 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -30,8 +30,8 @@ const ( WorkloadRestart = "restart" // WorkloadLock for lock workload WorkloadLock = "clock_%s" - // NodeResourceLock for lock node resource - NodeResourceLock = "cnode_%s_%s" + // PodLock for lock pod + PodLock = "plock_%s" // NodeOperationLock for lock node operation NodeOperationLock = "cnode_op_%s_%s" ) diff --git a/go.mod b/go.mod index d12b43312..85d93c9de 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( go.etcd.io/etcd/tests/v3 v3.5.0 go.uber.org/automaxprocs v1.3.0 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 - golang.org/x/exp v0.0.0-20220323204016-c86f0da35e87 + golang.org/x/exp v0.0.0-20220328175248-053ad81199eb golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.40.0 diff --git a/go.sum b/go.sum index a4a047247..ef5f8d531 100644 --- a/go.sum +++ b/go.sum @@ -665,6 +665,8 @@ golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925/go.mod h1:1phAWC201xIgDyaFpmDeZkgf70Q4Pd/CNqfRtVPtxNw= golang.org/x/exp v0.0.0-20220323204016-c86f0da35e87 h1:pFVxvJFSIGjuRLaw0mTTDfxn/AMeSYzY6Y/Hr7adkVU= golang.org/x/exp v0.0.0-20220323204016-c86f0da35e87/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= +golang.org/x/exp v0.0.0-20220328175248-053ad81199eb h1:pC9Okm6BVmxEw76PUu0XUbOTQ92JX11hfvqTjAV3qxM= +golang.org/x/exp v0.0.0-20220328175248-053ad81199eb/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=