Skip to content

Commit

Permalink
change the node lock to the pod level (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL authored Apr 4, 2022
1 parent 21ca429 commit 7183711
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 62 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
NodeCapacities: map[string]int{},
}

return msg, c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
return msg, c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
if opts.DeployStrategy != strategy.Dummy {
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
ch <- &types.CreateWorkloadMessage{Error: logger.Err(ctx, err)}
}
}()
return c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
return c.withNodesPodLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
// calculate plans
if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
return err
Expand Down Expand Up @@ -138,7 +138,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return
}
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if e := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, plan := range plans {
plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
defer close(ch)

for nodename, workloadIDs := range nodeWorkloadGroup {
if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if err := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs { // nolint:scopelint
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID} // nolint:scopelint
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
Expand Down
88 changes: 50 additions & 38 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func (c *Calcium) doLock(ctx context.Context, name string, timeout time.Duration
}()
rCtx, err = lock.Lock(ctx)
return lock, rCtx, errors.WithStack(err)

}

func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg string) error {
Expand Down Expand Up @@ -65,27 +64,6 @@ func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(cont
})
}

func (c *Calcium) withNodeResourceLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
return c.withNodeLocked(ctx, nodename, cluster.NodeResourceLock, f)
}

func (c *Calcium) withNodeOperationLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
return c.withNodeLocked(ctx, nodename, cluster.NodeOperationLock, f)
}

func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, lockKeyPattern string, f func(context.Context, *types.Node) error) error {
nf := types.NodeFilter{
Includes: []string{nodename},
All: true,
}
return c.withNodesLocked(ctx, nf, lockKeyPattern, func(ctx context.Context, nodes map[string]*types.Node) error {
if n, ok := nodes[nodename]; ok {
return f(ctx, n)
}
return errors.WithStack(types.ErrNodeNotExists)
})
}

func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
workloads := map[string]*types.Workload{}
locks := map[string]lock.DistributedLock{}
Expand Down Expand Up @@ -116,24 +94,54 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
return f(ctx, workloads)
}

func (c *Calcium) withNodePodLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
nf := types.NodeFilter{
Includes: []string{nodename},
All: true,
}
return c.withNodesPodLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
if n, ok := nodes[nodename]; ok {
return f(ctx, n)
}
return errors.WithStack(types.ErrNodeNotExists)
})
}

func (c *Calcium) withNodeOperationLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
nf := types.NodeFilter{
Includes: []string{nodename},
All: true,
}
return c.withNodesOperationLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
if n, ok := nodes[nodename]; ok {
return f(ctx, n)
}
return errors.WithStack(types.ErrNodeNotExists)
})
}

func (c *Calcium) withNodesOperationLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { // nolint
return c.withNodesLocked(ctx, nf, cluster.NodeOperationLock, f)
genKey := func(node *types.Node) string {
return fmt.Sprintf(cluster.NodeOperationLock, node.Podname, node.Name)
}
return c.withNodesLocked(ctx, nf, genKey, f)
}

func (c *Calcium) withNodesResourceLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
return c.withNodesLocked(ctx, nf, cluster.NodeResourceLock, f)
func (c *Calcium) withNodesPodLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
genKey := func(node *types.Node) string {
return fmt.Sprintf(cluster.PodLock, node.Podname)
}
return c.withNodesLocked(ctx, nf, genKey, f)
}

// withNodesLocked will using NodeFilter `nf` to filter nodes
// and lock the corresponding nodes for the callback function `f` to use
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, lockKeyPattern string, f func(context.Context, map[string]*types.Node) error) error {
nodenames := []string{}
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, genKey func(*types.Node) string, f func(context.Context, map[string]*types.Node) error) error {
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf)
lockKeys := []string{}
defer func() {
utils.Reverse(nodenames)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, nodenames...)
utils.Reverse(lockKeys)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, lockKeys...)
log.Debugf(ctx, "[withNodesLocked] keys %v unlocked", lockKeys)
}()

ns, err := c.filterNodes(ctx, nf)
Expand All @@ -143,19 +151,23 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, lock

var lock lock.DistributedLock
for _, n := range ns {
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(lockKeyPattern, n.Podname, n.Name), c.config.LockTimeout)
if err != nil {
return err
key := genKey(n)
if _, ok := locks[key]; !ok {
lock, ctx, err = c.doLock(ctx, key, c.config.LockTimeout)
if err != nil {
return err
}
log.Debugf(ctx, "[withNodesLocked] key %s locked", key)
locks[key] = lock
lockKeys = append(lockKeys, key)
}
log.Debugf(ctx, "[withNodesLocked] Node %s locked", n.Name)
locks[n.Name] = lock

// refresh node
node, err := c.GetNode(ctx, n.Name)
if err != nil {
return err
}
nodes[n.Name] = node
nodenames = append(nodenames, n.Name)
}
return f(ctx, nodes)
}
24 changes: 13 additions & 11 deletions cluster/calcium/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestWithWorkloadLocked(t *testing.T) {
assert.NoError(t, err)
}

func TestWithNodesResourceLocked(t *testing.T) {
func TestWithNodesPodLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
Expand All @@ -128,17 +128,18 @@ func TestWithNodesResourceLocked(t *testing.T) {
Labels: map[string]string{
"eru": "1",
},
Podname: "test",
},
Available: true,
}
// failed by list nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once()
err := c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err := c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// failed by filter
var ns map[string]*types.Node
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
ns = nodes
return nil
})
Expand All @@ -147,7 +148,7 @@ func TestWithNodesResourceLocked(t *testing.T) {
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil)
// failed by getnode
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once()
// failed by lock
Expand All @@ -156,23 +157,23 @@ func TestWithNodesResourceLocked(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once()
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesPodLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
assert.Len(t, nodes, 1)
return nil
})
assert.NoError(t, err)
}

func TestWithNodeResourceLocked(t *testing.T) {
func TestWithNodePodLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
Expand All @@ -184,6 +185,7 @@ func TestWithNodeResourceLocked(t *testing.T) {
Labels: map[string]string{
"eru": "1",
},
Podname: "test",
},
Available: true,
}
Expand All @@ -194,11 +196,11 @@ func TestWithNodeResourceLocked(t *testing.T) {
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err := c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
err := c.withNodePodLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
err = c.withNodePodLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
})
Expand Down Expand Up @@ -293,7 +295,7 @@ func TestWithNodeOperationLocked(t *testing.T) {
})
assert.NoError(t, err)

err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
err = c.withNodePodLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
return c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
if nodename == "" {
return logger.Err(ctx, errors.WithStack(types.ErrEmptyNodeName))
}
return c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
return c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
ws, err := c.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return logger.Err(ctx, err)
Expand Down Expand Up @@ -107,7 +107,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
return nil, logger.Err(ctx, err)
}
var n *types.Node
return n, c.withNodeResourceLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
return n, c.withNodePodLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
logger.Infof(ctx, "set node")
opts.Normalize(node)
n = node
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error {
Podname: podname,
All: true,
}
return c.withNodesResourceLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
return c.withNodesPodLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
// TODO dissociate workload to node
// TODO should remove node first
return logger.Err(ctx, errors.WithStack(c.store.RemovePod(ctx, podname)))
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
if err != nil {
return
}
return c.withNodeResourceLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
return c.withNodePodLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {

return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool)
utils.SentryGo(func(nodename string, workloadIDs []string) func() {
return func() {
defer wg.Done()
if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if err := c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs {
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Calcium) NodeResource(ctx context.Context, nodename string, fix bool) (

func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bool) (*types.NodeResource, error) {
var nr *types.NodeResource
return nr, c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
return nr, c.withNodePodLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
workloads, err := c.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const (
WorkloadRestart = "restart"
// WorkloadLock for lock workload
WorkloadLock = "clock_%s"
// NodeResourceLock for lock node resource
NodeResourceLock = "cnode_%s_%s"
// PodLock for lock pod
PodLock = "plock_%s"
// NodeOperationLock for lock node operation
NodeOperationLock = "cnode_op_%s_%s"
)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
go.etcd.io/etcd/tests/v3 v3.5.0
go.uber.org/automaxprocs v1.3.0
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
golang.org/x/exp v0.0.0-20220323204016-c86f0da35e87
golang.org/x/exp v0.0.0-20220328175248-053ad81199eb
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.40.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE
golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925/go.mod h1:1phAWC201xIgDyaFpmDeZkgf70Q4Pd/CNqfRtVPtxNw=
golang.org/x/exp v0.0.0-20220323204016-c86f0da35e87 h1:pFVxvJFSIGjuRLaw0mTTDfxn/AMeSYzY6Y/Hr7adkVU=
golang.org/x/exp v0.0.0-20220323204016-c86f0da35e87/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/exp v0.0.0-20220328175248-053ad81199eb h1:pC9Okm6BVmxEw76PUu0XUbOTQ92JX11hfvqTjAV3qxM=
golang.org/x/exp v0.0.0-20220328175248-053ad81199eb/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down

0 comments on commit 7183711

Please sign in to comment.