Skip to content

Commit

Permalink
Resolve potential deadlocks (#438)
Browse files Browse the repository at this point in the history
* sort & unique the nodes and workloads before locking

* make sure locking nodes before locking workloads

* unlock the mutex in the reverse order

* add tests
  • Loading branch information
jschwinger233 authored Jun 24, 2021
1 parent c8630ed commit 1664108
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 97 deletions.
35 changes: 28 additions & 7 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"context"
"fmt"
"sort"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -37,11 +38,18 @@ func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg s
return errors.WithStack(lock.Unlock(ctx))
}

func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) {
for n, lock := range locks {
// force unlock
if err := c.doUnlock(ctx, lock, n); err != nil {
log.Errorf(ctx, "[doUnlockAll] Unlock failed %v", err)
func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock, order ...string) {
// unlock in the reverse order
if len(order) != len(locks) {
log.Warn(ctx, "[doUnlockAll] order length not match lock map")
order = []string{}
for key := range locks {
order = append(order, key)
}
}
for _, key := range order {
if err := c.doUnlock(ctx, locks[key], key); err != nil {
log.Errorf(ctx, "[doUnlockAll] Unlock %s failed %v", key, err)
continue
}
}
Expand Down Expand Up @@ -72,8 +80,16 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
workloads := map[string]*types.Workload{}
locks := map[string]lock.DistributedLock{}

// sort + unique
sort.Strings(ids)
ids = ids[:utils.Unique(ids, func(i int) string { return ids[i] })]

defer log.Debugf(ctx, "[withWorkloadsLocked] Workloads %+v unlocked", ids)
defer func() { c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) }()
defer func() {
utils.Reverse(ids)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, ids...)
}()
cs, err := c.GetWorkloads(ctx, ids)
if err != nil {
return err
Expand All @@ -94,10 +110,14 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
// withNodesLocked will using NodeFilter `nf` to filter nodes
// and lock the corresponding nodes for the callback function `f` to use
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
nodenames := []string{}
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf)
defer c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks)
defer func() {
utils.Reverse(nodenames)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, nodenames...)
}()

ns, err := c.filterNodes(ctx, nf)
if err != nil {
Expand All @@ -118,6 +138,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu
return err
}
nodes[n.Name] = node
nodenames = append(nodenames, n.Name)
}
return f(ctx, nodes)
}
13 changes: 11 additions & 2 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package calcium

import (
"context"
"sort"

"github.com/pkg/errors"
"github.com/projecteru2/core/log"
Expand Down Expand Up @@ -176,8 +177,16 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
// filterNodes filters nodes using NodeFilter nf
// the filtering logic is introduced along with NodeFilter
// NOTE: when nf.Includes is set, they don't need to belong to podname
func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) ([]*types.Node, error) {
ns := []*types.Node{}
// updateon 2021-06-21: sort and unique locks to avoid deadlock
func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*types.Node, err error) {
defer func() {
if len(ns) == 0 {
return
}
sort.Slice(ns, func(i, j int) bool { return ns[i].Name <= ns[j].Name })
// unique
ns = ns[:utils.Unique(ns, func(i int) string { return ns[i].Name })]
}()

if len(nf.Includes) != 0 {
for _, nodename := range nf.Includes {
Expand Down
160 changes: 82 additions & 78 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,94 +16,98 @@ import (
// ReallocResource updates workload resource dynamically
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) {
logger := log.WithField("Calcium", "ReallocResource").WithField("opts", opts)
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
CPU: workload.CPU,
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
VolumeExist: workload.VolumePlanRequest,
},
)
if err != nil {
return logger.Err(ctx, err)
}
return logger.Err(ctx, c.doReallocOnNode(ctx, workload.Nodename, workload, rrs))
workload, err := c.GetWorkload(ctx, opts.ID)
if err != nil {
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
CPU: workload.CPU,
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
VolumeExist: workload.VolumePlanRequest,
},
)
if err != nil {
return logger.Err(ctx, err)
}
return logger.Err(ctx, c.doReallocOnNode(ctx, node, workload, rrs))
})
})
}

// transaction: node resource
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error {
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) (err error) {
node.RecycleResources(&workload.ResourceMeta)
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return err
}
func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workload *types.Workload, rrs resourcetypes.ResourceRequests) (err error) {
node.RecycleResources(&workload.ResourceMeta)
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return err
}

originalWorkload := *workload
resourceMeta := &types.ResourceMeta{}
if err = utils.Txn(
ctx,
originalWorkload := *workload
resourceMeta := &types.ResourceMeta{}
if err = utils.Txn(
ctx,

// if update workload resources
func(ctx context.Context) (err error) {
resourceMeta := &types.ResourceMeta{}
for _, plan := range plans {
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
}, resourceMeta); err != nil {
return err
}
// if update workload resources
func(ctx context.Context) (err error) {
resourceMeta := &types.ResourceMeta{}
for _, plan := range plans {
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
}, resourceMeta); err != nil {
return err
}
}

return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
},
// then commit changes
func(ctx context.Context) error {
for _, plan := range plans {
plan.ApplyChangesOnNode(node, 0)
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
},
// no need rollback
func(ctx context.Context, failureByCond bool) (err error) {
if failureByCond {
return
}
r := &types.ResourceMeta{
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
CPU: originalWorkload.CPU,
NUMANode: originalWorkload.NUMANode,
MemoryRequest: originalWorkload.MemoryRequest,
MemoryLimit: originalWorkload.MemoryLimit,
VolumeRequest: originalWorkload.VolumeRequest,
VolumeLimit: originalWorkload.VolumeLimit,
VolumePlanRequest: originalWorkload.VolumePlanRequest,
VolumePlanLimit: originalWorkload.VolumePlanLimit,
VolumeChanged: resourceMeta.VolumeChanged,
StorageRequest: originalWorkload.StorageRequest,
StorageLimit: originalWorkload.StorageLimit,
}
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
},
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
},
// then commit changes
func(ctx context.Context) error {
for _, plan := range plans {
plan.ApplyChangesOnNode(node, 0)
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
},
// no need rollback
func(ctx context.Context, failureByCond bool) (err error) {
if failureByCond {
return
}
r := &types.ResourceMeta{
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
CPU: originalWorkload.CPU,
NUMANode: originalWorkload.NUMANode,
MemoryRequest: originalWorkload.MemoryRequest,
MemoryLimit: originalWorkload.MemoryLimit,
VolumeRequest: originalWorkload.VolumeRequest,
VolumeLimit: originalWorkload.VolumeLimit,
VolumePlanRequest: originalWorkload.VolumePlanRequest,
VolumePlanLimit: originalWorkload.VolumePlanLimit,
VolumeChanged: resourceMeta.VolumeChanged,
StorageRequest: originalWorkload.StorageRequest,
StorageLimit: originalWorkload.StorageLimit,
}
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
},

c.config.GlobalTimeout,
); err != nil {
return
}
c.config.GlobalTimeout,
); err != nil {
return
}

c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
})
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
}

func (c *Calcium) doReallocWorkloadsOnInstance(ctx context.Context, engine engine.API, resourceMeta *types.ResourceMeta, workload *types.Workload) (err error) {
Expand Down
24 changes: 15 additions & 9 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,29 @@ func TestRealloc(t *testing.T) {
}
}

store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil)
store.On("GetWorkload", mock.Anything, "c1").Return(newC1(context.TODO(), nil)[0], nil)

// failed by GetNode
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
err := c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "ETCD must be set")
store.AssertExpectations(t)
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)

// failed by lock
store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err := c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "ETCD must be set")
store.AssertExpectations(t)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)

// failed by newCPU < 0
store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil)
err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "limit or request less than 0: bad `CPU` value")
store.AssertExpectations(t)

// failed by GetNode
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "ETCD must be set")
store.AssertExpectations(t)

// failed by no enough mem
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
simpleMockScheduler := &schedulermocks.Scheduler{}
scheduler.InitSchedulerV1(simpleMockScheduler)
c.scheduler = simpleMockScheduler
Expand Down Expand Up @@ -165,6 +167,7 @@ func TestRealloc(t *testing.T) {
},
Engine: engine,
}
store.On("GetWorkload", mock.Anything, "c2").Return(newC2(nil, nil)[0], nil)
store.On("GetWorkloads", mock.Anything, []string{"c2"}).Return(newC2, nil)
err = c.ReallocResource(ctx, newReallocOptions("c2", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "workload ID must be length of 64")
Expand Down Expand Up @@ -251,6 +254,7 @@ func TestRealloc(t *testing.T) {
simpleMockScheduler.On("ReselectCPUNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeCPUPlans, 2, nil)
simpleMockScheduler.On("ReselectVolumeNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeVolumePlans, 2, nil).Once()
store.On("GetNode", mock.Anything, "node2").Return(node2, nil)
store.On("GetWorkload", mock.Anything, "c3").Return(c3, nil)
store.On("GetWorkloads", mock.Anything, []string{"c3"}).Return([]*types.Workload{c3}, nil)
store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(types.ErrBadWorkloadID).Times(1)
err = c.ReallocResource(ctx, newReallocOptions("c3", 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}), types.TriKeep, types.TriKeep))
Expand Down Expand Up @@ -341,6 +345,7 @@ func TestReallocBindCpu(t *testing.T) {
}

store.On("GetNode", mock.Anything, "node3").Return(node3, nil)
store.On("GetWorkload", mock.Anything, "c5").Return(c5, nil)
store.On("GetWorkloads", mock.Anything, []string{"c5"}).Return([]*types.Workload{c5}, nil)
engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -358,6 +363,7 @@ func TestReallocBindCpu(t *testing.T) {
assert.Equal(t, 0, len(c5.ResourceMeta.CPU))
store.AssertExpectations(t)

store.On("GetWorkload", mock.Anything, "c6").Return(c6, nil)
store.On("GetWorkloads", mock.Anything, []string{"c6"}).Return([]*types.Workload{c6}, nil)
err = c.ReallocResource(ctx, newReallocOptions("c6", 0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep))
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion scheduler/complex/potassium.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (m *Potassium) SelectCPUNodes(ctx context.Context, scheduleInfos []resource

// ReselectCPUNodes used for realloc one container with cpu affinity
func (m *Potassium) ReselectCPUNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, CPU types.CPUMap, quota float64, memory int64) (resourcetypes.ScheduleInfo, map[string][]types.CPUMap, int, error) {
log.Infof(ctx, "[SelectCPUNodes] scheduleInfo %v, need cpu %f, need memory %d, existing %v", scheduleInfo, quota, memory, CPU)
var affinityPlan types.CPUMap
// remaining quota that's impossible to achieve affinity
if scheduleInfo, quota, affinityPlan = cpuReallocPlan(scheduleInfo, quota, CPU, int64(m.sharebase)); quota == 0 {
Expand Down Expand Up @@ -325,7 +326,7 @@ func (m *Potassium) SelectVolumeNodes(ctx context.Context, scheduleInfos []resou

// ReselectVolumeNodes is used for realloc only
func (m *Potassium) ReselectVolumeNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, existing types.VolumePlan, vbsReq types.VolumeBindings) (resourcetypes.ScheduleInfo, map[string][]types.VolumePlan, int, error) {

log.Infof(ctx, "[ReselectVolumeNodes] scheduleInfo %v, need volume: %v, existing %v", scheduleInfo, vbsReq.ToStringSlice(true, true), existing.ToLiteral())
affinityPlan := types.VolumePlan{}
needReschedule := types.VolumeBindings{}
norm, mono, unlim := distinguishVolumeBindings(vbsReq)
Expand Down
Loading

0 comments on commit 1664108

Please sign in to comment.