Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve potential deadlocks #438

Merged
merged 4 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package calcium
import (
"context"
"fmt"
"sort"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -37,11 +38,18 @@ func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg s
return errors.WithStack(lock.Unlock(ctx))
}

func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) {
for n, lock := range locks {
// force unlock
if err := c.doUnlock(ctx, lock, n); err != nil {
log.Errorf(ctx, "[doUnlockAll] Unlock failed %v", err)
func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock, order ...string) {
// unlock in the reverse order
if len(order) != len(locks) {
log.Warn(ctx, "[doUnlockAll] order length not match lock map")
order = []string{}
for key := range locks {
order = append(order, key)
}
}
for _, key := range order {
if err := c.doUnlock(ctx, locks[key], key); err != nil {
log.Errorf(ctx, "[doUnlockAll] Unlock %s failed %v", key, err)
continue
}
}
Expand Down Expand Up @@ -72,8 +80,16 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co
func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error {
workloads := map[string]*types.Workload{}
locks := map[string]lock.DistributedLock{}

// sort + unique
sort.Strings(ids)
ids = ids[:utils.Unique(ids, func(i int) string { return ids[i] })]

defer log.Debugf(ctx, "[withWorkloadsLocked] Workloads %+v unlocked", ids)
defer func() { c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) }()
defer func() {
utils.Reverse(ids)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, ids...)
}()
cs, err := c.GetWorkloads(ctx, ids)
if err != nil {
return err
Expand All @@ -94,10 +110,14 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(
// withNodesLocked will using NodeFilter `nf` to filter nodes
// and lock the corresponding nodes for the callback function `f` to use
func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error {
nodenames := []string{}
nodes := map[string]*types.Node{}
locks := map[string]lock.DistributedLock{}
defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf)
defer c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks)
defer func() {
utils.Reverse(nodenames)
c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, nodenames...)
}()

ns, err := c.filterNodes(ctx, nf)
if err != nil {
Expand All @@ -118,6 +138,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu
return err
}
nodes[n.Name] = node
nodenames = append(nodenames, n.Name)
}
return f(ctx, nodes)
}
13 changes: 11 additions & 2 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package calcium

import (
"context"
"sort"

"github.com/pkg/errors"
"github.com/projecteru2/core/log"
Expand Down Expand Up @@ -176,8 +177,16 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
// filterNodes filters nodes using NodeFilter nf
// the filtering logic is introduced along with NodeFilter
// NOTE: when nf.Includes is set, they don't need to belong to podname
func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) ([]*types.Node, error) {
ns := []*types.Node{}
// updateon 2021-06-21: sort and unique locks to avoid deadlock
func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*types.Node, err error) {
defer func() {
if len(ns) == 0 {
return
}
sort.Slice(ns, func(i, j int) bool { return ns[i].Name <= ns[j].Name })
// unique
ns = ns[:utils.Unique(ns, func(i int) string { return ns[i].Name })]
}()

if len(nf.Includes) != 0 {
for _, nodename := range nf.Includes {
Expand Down
160 changes: 82 additions & 78 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,94 +16,98 @@ import (
// ReallocResource updates workload resource dynamically
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) {
logger := log.WithField("Calcium", "ReallocResource").WithField("opts", opts)
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
types.ResourceOptions{
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
CPU: workload.CPU,
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
VolumeExist: workload.VolumePlanRequest,
},
)
if err != nil {
return logger.Err(ctx, err)
}
return logger.Err(ctx, c.doReallocOnNode(ctx, workload.Nodename, workload, rrs))
workload, err := c.GetWorkload(ctx, opts.ID)
if err != nil {
return
}
return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error {
return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error {
rrs, err := resources.MakeRequests(
CMGS marked this conversation as resolved.
Show resolved Hide resolved
types.ResourceOptions{
CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest),
CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit),
CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0),
CPU: workload.CPU,
MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest,
MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit,
StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest,
StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit,
VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest),
VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit),
VolumeExist: workload.VolumePlanRequest,
},
)
if err != nil {
return logger.Err(ctx, err)
}
return logger.Err(ctx, c.doReallocOnNode(ctx, node, workload, rrs))
})
})
}

// transaction: node resource
func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error {
return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) (err error) {
node.RecycleResources(&workload.ResourceMeta)
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return err
}
func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workload *types.Workload, rrs resourcetypes.ResourceRequests) (err error) {
node.RecycleResources(&workload.ResourceMeta)
plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node})
if err != nil {
return err
}

originalWorkload := *workload
resourceMeta := &types.ResourceMeta{}
if err = utils.Txn(
ctx,
originalWorkload := *workload
resourceMeta := &types.ResourceMeta{}
if err = utils.Txn(
ctx,

// if update workload resources
func(ctx context.Context) (err error) {
resourceMeta := &types.ResourceMeta{}
for _, plan := range plans {
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
}, resourceMeta); err != nil {
return err
}
// if update workload resources
func(ctx context.Context) (err error) {
resourceMeta := &types.ResourceMeta{}
for _, plan := range plans {
if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{
Node: node,
}, resourceMeta); err != nil {
return err
}
}

return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
},
// then commit changes
func(ctx context.Context) error {
for _, plan := range plans {
plan.ApplyChangesOnNode(node, 0)
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
},
// no need rollback
func(ctx context.Context, failureByCond bool) (err error) {
if failureByCond {
return
}
r := &types.ResourceMeta{
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
CPU: originalWorkload.CPU,
NUMANode: originalWorkload.NUMANode,
MemoryRequest: originalWorkload.MemoryRequest,
MemoryLimit: originalWorkload.MemoryLimit,
VolumeRequest: originalWorkload.VolumeRequest,
VolumeLimit: originalWorkload.VolumeLimit,
VolumePlanRequest: originalWorkload.VolumePlanRequest,
VolumePlanLimit: originalWorkload.VolumePlanLimit,
VolumeChanged: resourceMeta.VolumeChanged,
StorageRequest: originalWorkload.StorageRequest,
StorageLimit: originalWorkload.StorageLimit,
}
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
},
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload)
},
// then commit changes
func(ctx context.Context) error {
for _, plan := range plans {
plan.ApplyChangesOnNode(node, 0)
}
return errors.WithStack(c.store.UpdateNodes(ctx, node))
},
// no need rollback
func(ctx context.Context, failureByCond bool) (err error) {
if failureByCond {
return
}
r := &types.ResourceMeta{
CPUQuotaRequest: originalWorkload.CPUQuotaRequest,
CPUQuotaLimit: originalWorkload.CPUQuotaLimit,
CPU: originalWorkload.CPU,
NUMANode: originalWorkload.NUMANode,
MemoryRequest: originalWorkload.MemoryRequest,
MemoryLimit: originalWorkload.MemoryLimit,
VolumeRequest: originalWorkload.VolumeRequest,
VolumeLimit: originalWorkload.VolumeLimit,
VolumePlanRequest: originalWorkload.VolumePlanRequest,
VolumePlanLimit: originalWorkload.VolumePlanLimit,
VolumeChanged: resourceMeta.VolumeChanged,
StorageRequest: originalWorkload.StorageRequest,
StorageLimit: originalWorkload.StorageLimit,
}
return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload)
},

c.config.GlobalTimeout,
); err != nil {
return
}
c.config.GlobalTimeout,
); err != nil {
return
}

c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
})
c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node)
return nil
}

func (c *Calcium) doReallocWorkloadsOnInstance(ctx context.Context, engine engine.API, resourceMeta *types.ResourceMeta, workload *types.Workload) (err error) {
Expand Down
24 changes: 15 additions & 9 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,29 @@ func TestRealloc(t *testing.T) {
}
}

store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil)
store.On("GetWorkload", mock.Anything, "c1").Return(newC1(context.TODO(), nil)[0], nil)

// failed by GetNode
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
err := c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "ETCD must be set")
store.AssertExpectations(t)
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)

// failed by lock
store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
err := c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "ETCD must be set")
store.AssertExpectations(t)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)

// failed by newCPU < 0
store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil)
err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "limit or request less than 0: bad `CPU` value")
store.AssertExpectations(t)

// failed by GetNode
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "ETCD must be set")
store.AssertExpectations(t)

// failed by no enough mem
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
simpleMockScheduler := &schedulermocks.Scheduler{}
scheduler.InitSchedulerV1(simpleMockScheduler)
c.scheduler = simpleMockScheduler
Expand Down Expand Up @@ -165,6 +167,7 @@ func TestRealloc(t *testing.T) {
},
Engine: engine,
}
store.On("GetWorkload", mock.Anything, "c2").Return(newC2(nil, nil)[0], nil)
store.On("GetWorkloads", mock.Anything, []string{"c2"}).Return(newC2, nil)
err = c.ReallocResource(ctx, newReallocOptions("c2", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep))
assert.EqualError(t, err, "workload ID must be length of 64")
Expand Down Expand Up @@ -251,6 +254,7 @@ func TestRealloc(t *testing.T) {
simpleMockScheduler.On("ReselectCPUNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeCPUPlans, 2, nil)
simpleMockScheduler.On("ReselectVolumeNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeVolumePlans, 2, nil).Once()
store.On("GetNode", mock.Anything, "node2").Return(node2, nil)
store.On("GetWorkload", mock.Anything, "c3").Return(c3, nil)
store.On("GetWorkloads", mock.Anything, []string{"c3"}).Return([]*types.Workload{c3}, nil)
store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(types.ErrBadWorkloadID).Times(1)
err = c.ReallocResource(ctx, newReallocOptions("c3", 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}), types.TriKeep, types.TriKeep))
Expand Down Expand Up @@ -341,6 +345,7 @@ func TestReallocBindCpu(t *testing.T) {
}

store.On("GetNode", mock.Anything, "node3").Return(node3, nil)
store.On("GetWorkload", mock.Anything, "c5").Return(c5, nil)
store.On("GetWorkloads", mock.Anything, []string{"c5"}).Return([]*types.Workload{c5}, nil)
engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -358,6 +363,7 @@ func TestReallocBindCpu(t *testing.T) {
assert.Equal(t, 0, len(c5.ResourceMeta.CPU))
store.AssertExpectations(t)

store.On("GetWorkload", mock.Anything, "c6").Return(c6, nil)
store.On("GetWorkloads", mock.Anything, []string{"c6"}).Return([]*types.Workload{c6}, nil)
err = c.ReallocResource(ctx, newReallocOptions("c6", 0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep))
assert.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion scheduler/complex/potassium.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (m *Potassium) SelectCPUNodes(ctx context.Context, scheduleInfos []resource

// ReselectCPUNodes used for realloc one container with cpu affinity
func (m *Potassium) ReselectCPUNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, CPU types.CPUMap, quota float64, memory int64) (resourcetypes.ScheduleInfo, map[string][]types.CPUMap, int, error) {
log.Infof(ctx, "[SelectCPUNodes] scheduleInfo %v, need cpu %f, need memory %d, existing %v", scheduleInfo, quota, memory, CPU)
var affinityPlan types.CPUMap
// remaining quota that's impossible to achieve affinity
if scheduleInfo, quota, affinityPlan = cpuReallocPlan(scheduleInfo, quota, CPU, int64(m.sharebase)); quota == 0 {
Expand Down Expand Up @@ -325,7 +326,7 @@ func (m *Potassium) SelectVolumeNodes(ctx context.Context, scheduleInfos []resou

// ReselectVolumeNodes is used for realloc only
func (m *Potassium) ReselectVolumeNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, existing types.VolumePlan, vbsReq types.VolumeBindings) (resourcetypes.ScheduleInfo, map[string][]types.VolumePlan, int, error) {

log.Infof(ctx, "[ReselectVolumeNodes] scheduleInfo %v, need volume: %v, existing %v", scheduleInfo, vbsReq.ToStringSlice(true, true), existing.ToLiteral())
affinityPlan := types.VolumePlan{}
needReschedule := types.VolumeBindings{}
norm, mono, unlim := distinguishVolumeBindings(vbsReq)
Expand Down
Loading