Skip to content

Commit

Permalink
add new types of node lock
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Mar 8, 2022
1 parent 339e5f0 commit e915e14
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
NodeCapacities: map[string]int{},
}

return msg, c.withNodesLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
return msg, c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
if opts.DeployStrategy != strategy.Dummy {
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
Expand Down
12 changes: 4 additions & 8 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
ch <- &types.CreateWorkloadMessage{Error: logger.Err(ctx, err)}
}
}()
return c.withNodesLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
return c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
// calculate plans
if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
return err
Expand Down Expand Up @@ -136,7 +136,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return
}
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if e := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, plan := range plans {
plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint
}
Expand Down Expand Up @@ -249,12 +249,8 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
// remap 就不搞进事务了吧, 回滚代价太大了
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
// 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.Errorf(ctx, "failed to lock node to remap: %v", err)
}
go c.doRemapResourceAndLog(ctx, logger, node)

return indices, err
}

Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
defer close(ch)

for nodename, workloadIDs := range nodeWorkloadGroup {
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs { // nolint:scopelint
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID} // nolint:scopelint
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
Expand Down Expand Up @@ -57,7 +57,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
}
ch <- msg
}
c.doRemapResourceAndLog(ctx, logger, node)
go c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err)
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"context"
"testing"
"time"

lockmocks "github.com/projecteru2/core/lock/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
Expand Down Expand Up @@ -66,9 +67,11 @@ func TestDissociateWorkload(t *testing.T) {
for r := range ch {
assert.Error(t, r.Error)
}
time.Sleep(time.Second)
store.AssertExpectations(t)

store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)

// success
ch, err = c.DissociateWorkload(ctx, []string{"c1"})
assert.NoError(t, err)
Expand Down
29 changes: 25 additions & 4 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,25 @@ func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(cont
})
}

func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
func (c *Calcium) withNodeResourceLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
nf := types.NodeFilter{
Includes: []string{nodename},
All: true,
}
return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
return c.withNodesResourceLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
if n, ok := nodes[nodename]; ok {
return f(ctx, n)
}
return errors.WithStack(types.ErrNodeNotExists)
})
}

func (c *Calcium) withNodeOperationLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
nf := types.NodeFilter{
Includes: []string{nodename},
All: true,
}
return c.withNodesOperationLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
if n, ok := nodes[nodename]; ok {
return f(ctx, n)
}
Expand Down Expand Up @@ -108,9 +121,17 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
return f(ctx, workloads)
}

func (c *Calcium) withNodesOperationLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
return c.withNodesLocked(ctx, nf, cluster.NodeOperationLock, f)
}

func (c *Calcium) withNodesResourceLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
return c.withNodesLocked(ctx, nf, cluster.NodeResourceLock, f)
}

// withNodesLocked will using NodeFilter `nf` to filter nodes
// and lock the corresponding nodes for the callback function `f` to use
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, lockKeyPattern string, f func(context.Context, map[string]*types.Node) error) error {
nodenames := []string{}
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
Expand All @@ -127,7 +148,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu

var lock lock.DistributedLock
for _, n := range ns {
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.NodeLock, n.Podname, n.Name), c.config.LockTimeout)
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(lockKeyPattern, n.Podname, n.Name), c.config.LockTimeout)
if err != nil {
return err
}
Expand Down
117 changes: 107 additions & 10 deletions cluster/calcium/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestWithWorkloadLocked(t *testing.T) {
assert.NoError(t, err)
}

func TestWithNodesLocked(t *testing.T) {
func TestWithNodesResourceLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
Expand All @@ -133,12 +133,12 @@ func TestWithNodesLocked(t *testing.T) {
}
// failed by list nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once()
err := c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err := c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// failed by filter
var ns map[string]*types.Node
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
ns = nodes
return nil
})
Expand All @@ -147,7 +147,7 @@ func TestWithNodesLocked(t *testing.T) {
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil)
// failed by getnode
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once()
// failed by lock
Expand All @@ -156,23 +156,23 @@ func TestWithNodesLocked(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
assert.Len(t, nodes, 1)
return nil
})
assert.NoError(t, err)
}

func TestWithNodeLocked(t *testing.T) {
func TestWithNodeResourceLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
Expand All @@ -194,13 +194,110 @@ func TestWithNodeLocked(t *testing.T) {
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err := c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
err := c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
})
assert.NoError(t, err)
}

func TestWithNodesOperationLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
c.store = store

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "test",
Labels: map[string]string{
"eru": "1",
},
},
Available: true,
}
// failed by list nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once()
err := c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// failed by filter
var ns map[string]*types.Node
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
ns = nodes
return nil
})
assert.NoError(t, err)
assert.Empty(t, ns)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil)
// failed by getnode
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once()
// failed by lock
lock := &lockmocks.DistributedLock{}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
lock.On("Unlock", mock.Anything).Return(nil)
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once()
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
assert.Len(t, nodes, 1)
return nil
})
assert.NoError(t, err)
}

func TestWithNodeOperationLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
c.store = store

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "test",
Labels: map[string]string{
"eru": "1",
},
},
Available: true,
}
// failed by lock
lock := &lockmocks.DistributedLock{}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
lock.On("Unlock", mock.Anything).Return(nil)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err := c.withNodeOperationLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodeOperationLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
})
assert.NoError(t, err)

err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
return c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
})
})
assert.NoError(t, err)
}
4 changes: 2 additions & 2 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
if nodename == "" {
return logger.Err(ctx, errors.WithStack(types.ErrEmptyNodeName))
}
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
return c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
ws, err := c.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return logger.Err(ctx, err)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
return nil, logger.Err(ctx, err)
}
var n *types.Node
return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
return n, c.withNodeResourceLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
logger.Infof(ctx, "set node")
opts.Normalize(node)
n = node
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error {
Podname: podname,
All: true,
}
return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
return c.withNodesResourceLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
// TODO dissociate workload to node
// TODO should remove node first
return logger.Err(ctx, errors.WithStack(c.store.RemovePod(ctx, podname)))
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
if err != nil {
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
return c.withNodeResourceLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {

return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
Expand Down Expand Up @@ -108,7 +108,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa
return
}

c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
go c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"context"
"testing"
"time"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
Expand Down Expand Up @@ -362,6 +363,7 @@ func TestReallocBindCpu(t *testing.T) {
err = c.ReallocResource(ctx, newReallocOptions("c5", 0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
assert.NoError(t, err)
assert.Equal(t, 0, len(c5.ResourceMeta.CPU))
time.Sleep(time.Second)
store.AssertExpectations(t)

store.On("GetWorkload", mock.Anything, "c6").Return(c6, nil)
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
utils.SentryGo(func(nodename string, workloadIDs []string) func() {
return func() {
defer wg.Done()
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs {
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
Expand Down Expand Up @@ -66,7 +66,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
}
ch <- ret
}
c.doRemapResourceAndLog(ctx, logger, node)
go c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err)
Expand Down
Loading

0 comments on commit e915e14

Please sign in to comment.