Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new types of node lock #554

Merged
merged 1 commit into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Calcium) CalculateCapacity(ctx context.Context, opts *types.DeployOptio
NodeCapacities: map[string]int{},
}

return msg, c.withNodesLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
return msg, c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) error {
if opts.DeployStrategy != strategy.Dummy {
if _, msg.NodeCapacities, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
logger.Errorf(ctx, "[Calcium.CalculateCapacity] doAllocResource failed: %+v", err)
Expand Down
12 changes: 4 additions & 8 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
ch <- &types.CreateWorkloadMessage{Error: logger.Err(ctx, err)}
}
}()
return c.withNodesLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
return c.withNodesResourceLocked(ctx, opts.NodeFilter, func(ctx context.Context, nodeMap map[string]*types.Node) (err error) {
// calculate plans
if plans, deployMap, err = c.doAllocResource(ctx, nodeMap, opts); err != nil {
return err
Expand Down Expand Up @@ -136,7 +136,7 @@ func (c *Calcium) doCreateWorkloads(ctx context.Context, opts *types.DeployOptio
return
}
for nodename, rollbackIndices := range rollbackMap {
if e := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if e := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, plan := range plans {
plan.RollbackChangesOnNode(node, rollbackIndices...) // nolint:scopelint
}
Expand Down Expand Up @@ -249,12 +249,8 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context, ch chan *types.Cr
// remap 就不搞进事务了吧, 回滚代价太大了
// 放任 remap 失败的后果是, share pool 没有更新, 这个后果姑且认为是可以承受的
// 而且 remap 是一个幂等操作, 就算这次 remap 失败, 下次 remap 也能收敛到正确到状态
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.Errorf(ctx, "failed to lock node to remap: %v", err)
}
go c.doRemapResourceAndLog(ctx, logger, node)

return indices, err
}

Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
defer close(ch)

for nodename, workloadIDs := range nodeWorkloadGroup {
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs { // nolint:scopelint
msg := &types.DissociateWorkloadMessage{WorkloadID: workloadID} // nolint:scopelint
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
Expand Down Expand Up @@ -57,7 +57,7 @@ func (c *Calcium) DissociateWorkload(ctx context.Context, ids []string) (chan *t
}
ch <- msg
}
c.doRemapResourceAndLog(ctx, logger, node)
go c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err)
Expand Down
3 changes: 3 additions & 0 deletions cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"context"
"testing"
"time"

lockmocks "github.com/projecteru2/core/lock/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
Expand Down Expand Up @@ -66,9 +67,11 @@ func TestDissociateWorkload(t *testing.T) {
for r := range ch {
assert.Error(t, r.Error)
}
time.Sleep(time.Second)
store.AssertExpectations(t)

store.On("RemoveWorkload", mock.Anything, mock.Anything).Return(nil)

// success
ch, err = c.DissociateWorkload(ctx, []string{"c1"})
assert.NoError(t, err)
Expand Down
24 changes: 20 additions & 4 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,20 @@ func (c *Calcium) withWorkloadLocked(ctx context.Context, id string, f func(cont
})
}

func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
func (c *Calcium) withNodeResourceLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
return c.withNodeLocked(ctx, nodename, cluster.NodeResourceLock, f)
}

func (c *Calcium) withNodeOperationLocked(ctx context.Context, nodename string, f func(context.Context, *types.Node) error) error {
return c.withNodeLocked(ctx, nodename, cluster.NodeOperationLock, f)
}

func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, lockKeyPattern string, f func(context.Context, *types.Node) error) error {
nf := types.NodeFilter{
Includes: []string{nodename},
All: true,
}
return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
return c.withNodesLocked(ctx, nf, lockKeyPattern, func(ctx context.Context, nodes map[string]*types.Node) error {
if n, ok := nodes[nodename]; ok {
return f(ctx, n)
}
Expand Down Expand Up @@ -108,9 +116,17 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
return f(ctx, workloads)
}

func (c *Calcium) withNodesOperationLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { // nolint
return c.withNodesLocked(ctx, nf, cluster.NodeOperationLock, f)
}

func (c *Calcium) withNodesResourceLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
return c.withNodesLocked(ctx, nf, cluster.NodeResourceLock, f)
}

// withNodesLocked will using NodeFilter `nf` to filter nodes
// and lock the corresponding nodes for the callback function `f` to use
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, lockKeyPattern string, f func(context.Context, map[string]*types.Node) error) error {
nodenames := []string{}
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
Expand All @@ -127,7 +143,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu

var lock lock.DistributedLock
for _, n := range ns {
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(cluster.NodeLock, n.Podname, n.Name), c.config.LockTimeout)
lock, ctx, err = c.doLock(ctx, fmt.Sprintf(lockKeyPattern, n.Podname, n.Name), c.config.LockTimeout)
if err != nil {
return err
}
Expand Down
117 changes: 107 additions & 10 deletions cluster/calcium/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestWithWorkloadLocked(t *testing.T) {
assert.NoError(t, err)
}

func TestWithNodesLocked(t *testing.T) {
func TestWithNodesResourceLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
Expand All @@ -133,12 +133,12 @@ func TestWithNodesLocked(t *testing.T) {
}
// failed by list nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once()
err := c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err := c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// failed by filter
var ns map[string]*types.Node
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
ns = nodes
return nil
})
Expand All @@ -147,7 +147,7 @@ func TestWithNodesLocked(t *testing.T) {
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil)
// failed by getnode
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once()
// failed by lock
Expand All @@ -156,23 +156,23 @@ func TestWithNodesLocked(t *testing.T) {
lock.On("Unlock", mock.Anything).Return(nil)
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodesLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
err = c.withNodesResourceLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
assert.Len(t, nodes, 1)
return nil
})
assert.NoError(t, err)
}

func TestWithNodeLocked(t *testing.T) {
func TestWithNodeResourceLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
Expand All @@ -194,13 +194,110 @@ func TestWithNodeLocked(t *testing.T) {
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err := c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
err := c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodeLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
})
assert.NoError(t, err)
}

func TestWithNodesOperationLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
c.store = store

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "test",
Labels: map[string]string{
"eru": "1",
},
},
Available: true,
}
// failed by list nodes
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, types.ErrNoETCD).Once()
err := c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil).Once()
// failed by filter
var ns map[string]*types.Node
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Labels: map[string]string{"eru": "2"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
ns = nodes
return nil
})
assert.NoError(t, err)
assert.Empty(t, ns)
store.On("GetNodesByPod", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*types.Node{}, nil)
// failed by getnode
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil).Once()
// failed by lock
lock := &lockmocks.DistributedLock{}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
lock.On("Unlock", mock.Anything).Return(nil)
// failed to get lock
lock.On("Lock", mock.Anything).Return(context.TODO(), types.ErrNoETCD).Once()
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodesOperationLocked(ctx, types.NodeFilter{Podname: "test", Includes: []string{"test"}, All: false}, func(ctx context.Context, nodes map[string]*types.Node) error {
assert.Len(t, nodes, 1)
return nil
})
assert.NoError(t, err)
}

func TestWithNodeOperationLocked(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
c.store = store

node1 := &types.Node{
NodeMeta: types.NodeMeta{
Name: "test",
Labels: map[string]string{
"eru": "1",
},
},
Available: true,
}
// failed by lock
lock := &lockmocks.DistributedLock{}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
lock.On("Unlock", mock.Anything).Return(nil)
lock.On("Lock", mock.Anything).Return(context.TODO(), nil)
// failed by get locked node
store.On("GetNode", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err := c.withNodeOperationLocked(ctx, "test", func(ctx context.Context, node *types.Node) error { return nil })
assert.Error(t, err)
store.On("GetNode", mock.Anything, mock.Anything).Return(node1, nil)
// success
err = c.withNodeOperationLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
})
assert.NoError(t, err)

err = c.withNodeResourceLocked(ctx, "test", func(ctx context.Context, node *types.Node) error {
return c.withNodeOperationLocked(ctx, node.Name, func(ctx context.Context, node *types.Node) error {
assert.Equal(t, node.Name, node1.Name)
return nil
})
})
assert.NoError(t, err)
}
4 changes: 2 additions & 2 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
if nodename == "" {
return logger.Err(ctx, errors.WithStack(types.ErrEmptyNodeName))
}
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
return c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
ws, err := c.ListNodeWorkloads(ctx, node.Name, nil)
if err != nil {
return logger.Err(ctx, err)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
return nil, logger.Err(ctx, err)
}
var n *types.Node
return n, c.withNodeLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
return n, c.withNodeResourceLocked(ctx, opts.Nodename, func(ctx context.Context, node *types.Node) error {
logger.Infof(ctx, "set node")
opts.Normalize(node)
n = node
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *Calcium) RemovePod(ctx context.Context, podname string) error {
Podname: podname,
All: true,
}
return c.withNodesLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
return c.withNodesResourceLocked(ctx, nf, func(ctx context.Context, nodes map[string]*types.Node) error {
// TODO dissociate workload to node
// TODO should remove node first
return logger.Err(ctx, errors.WithStack(c.store.RemovePod(ctx, podname)))
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption
if err != nil {
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
return c.withNodeResourceLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {

return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
Expand Down Expand Up @@ -108,7 +108,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workloa
return
}

c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
go c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"context"
"testing"
"time"

enginemocks "github.com/projecteru2/core/engine/mocks"
enginetypes "github.com/projecteru2/core/engine/types"
Expand Down Expand Up @@ -362,6 +363,7 @@ func TestReallocBindCpu(t *testing.T) {
err = c.ReallocResource(ctx, newReallocOptions("c5", 0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
assert.NoError(t, err)
assert.Equal(t, 0, len(c5.ResourceMeta.CPU))
time.Sleep(time.Second)
store.AssertExpectations(t)

store.On("GetWorkload", mock.Anything, "c6").Return(c6, nil)
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
utils.SentryGo(func(nodename string, workloadIDs []string) func() {
return func() {
defer wg.Done()
if err := c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
if err := c.withNodeResourceLocked(ctx, nodename, func(ctx context.Context, node *types.Node) error {
for _, workloadID := range workloadIDs {
ret := &types.RemoveWorkloadMessage{WorkloadID: workloadID, Success: true, Hook: []*bytes.Buffer{}}
if err := c.withWorkloadLocked(ctx, workloadID, func(ctx context.Context, workload *types.Workload) error {
Expand Down Expand Up @@ -66,7 +66,7 @@ func (c *Calcium) RemoveWorkload(ctx context.Context, ids []string, force bool,
}
ch <- ret
}
c.doRemapResourceAndLog(ctx, logger, node)
go c.doRemapResourceAndLog(ctx, logger, node)
return nil
}); err != nil {
logger.WithField("nodename", nodename).Errorf(ctx, "failed to lock node: %+v", err)
Expand Down
7 changes: 1 addition & 6 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,7 @@ func (c *Calcium) doReplaceWorkload(
return createMessage, removeMessage, err
}

if err := c.withNodeLocked(ctx, node.Name, func(_ context.Context, node *types.Node) error {
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReplaceWorkload"), node)
return nil
}); err != nil {
log.Errorf(ctx, "[replaceAndRemove] failed to lock node to remap: %v", err)
}
go c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReplaceWorkload"), node)

return createMessage, removeMessage, err
}
Expand Down
Loading