diff --git a/cluster/calcium/lock.go b/cluster/calcium/lock.go index d93293c5d..1c0b48c2b 100644 --- a/cluster/calcium/lock.go +++ b/cluster/calcium/lock.go @@ -3,6 +3,7 @@ package calcium import ( "context" "fmt" + "sort" "time" "github.com/pkg/errors" @@ -37,11 +38,18 @@ func (c *Calcium) doUnlock(ctx context.Context, lock lock.DistributedLock, msg s return errors.WithStack(lock.Unlock(ctx)) } -func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) { - for n, lock := range locks { - // force unlock - if err := c.doUnlock(ctx, lock, n); err != nil { - log.Errorf(ctx, "[doUnlockAll] Unlock failed %v", err) +func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock, order ...string) { + // unlock in the reverse order + if len(order) != len(locks) { + log.Warn(ctx, "[doUnlockAll] order length not match lock map") + order = []string{} + for key := range locks { + order = append(order, key) + } + } + for _, key := range order { + if err := c.doUnlock(ctx, locks[key], key); err != nil { + log.Errorf(ctx, "[doUnlockAll] Unlock %s failed %v", key, err) continue } } @@ -72,8 +80,16 @@ func (c *Calcium) withNodeLocked(ctx context.Context, nodename string, f func(co func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func(context.Context, map[string]*types.Workload) error) error { workloads := map[string]*types.Workload{} locks := map[string]lock.DistributedLock{} + + // sort + unique + sort.Strings(ids) + ids = ids[:utils.Unique(ids, func(i int) string { return ids[i] })] + defer log.Debugf(ctx, "[withWorkloadsLocked] Workloads %+v unlocked", ids) - defer func() { c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) }() + defer func() { + utils.Reverse(ids) + c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, ids...) + }() cs, err := c.GetWorkloads(ctx, ids) if err != nil { return err @@ -94,10 +110,14 @@ func (c *Calcium) withWorkloadsLocked(ctx context.Context, ids []string, f func( // withNodesLocked will using NodeFilter `nf` to filter nodes // and lock the corresponding nodes for the callback function `f` to use func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f func(context.Context, map[string]*types.Node) error) error { + nodenames := []string{} nodes := map[string]*types.Node{} locks := map[string]lock.DistributedLock{} defer log.Debugf(ctx, "[withNodesLocked] Nodes %+v unlocked", nf) - defer c.doUnlockAll(utils.InheritTracingInfo(ctx, context.Background()), locks) + defer func() { + utils.Reverse(nodenames) + c.doUnlockAll(utils.InheritTracingInfo(ctx, context.TODO()), locks, nodenames...) + }() ns, err := c.filterNodes(ctx, nf) if err != nil { @@ -118,6 +138,7 @@ func (c *Calcium) withNodesLocked(ctx context.Context, nf types.NodeFilter, f fu return err } nodes[n.Name] = node + nodenames = append(nodenames, n.Name) } return f(ctx, nodes) } diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index 34474c66e..ebe802a73 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -2,6 +2,7 @@ package calcium import ( "context" + "sort" "github.com/pkg/errors" "github.com/projecteru2/core/log" @@ -176,8 +177,16 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ // filterNodes filters nodes using NodeFilter nf // the filtering logic is introduced along with NodeFilter // NOTE: when nf.Includes is set, they don't need to belong to podname -func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) ([]*types.Node, error) { - ns := []*types.Node{} +// updateon 2021-06-21: sort and unique locks to avoid deadlock +func (c *Calcium) filterNodes(ctx context.Context, nf types.NodeFilter) (ns []*types.Node, err error) { + defer func() { + if len(ns) == 0 { + return + } + sort.Slice(ns, func(i, j int) bool { return ns[i].Name <= ns[j].Name }) + // unique + ns = ns[:utils.Unique(ns, func(i int) string { return ns[i].Name })] + }() if len(nf.Includes) != 0 { for _, nodename := range nf.Includes { diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 80ddb1b39..4dd6a68f2 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -16,94 +16,98 @@ import ( // ReallocResource updates workload resource dynamically func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (err error) { logger := log.WithField("Calcium", "ReallocResource").WithField("opts", opts) - return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error { - rrs, err := resources.MakeRequests( - types.ResourceOptions{ - CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest), - CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit), - CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0), - CPU: workload.CPU, - MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest, - MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit, - StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest, - StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit, - VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest), - VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit), - VolumeExist: workload.VolumePlanRequest, - }, - ) - if err != nil { - return logger.Err(ctx, err) - } - return logger.Err(ctx, c.doReallocOnNode(ctx, workload.Nodename, workload, rrs)) + workload, err := c.GetWorkload(ctx, opts.ID) + if err != nil { + return + } + return c.withNodeLocked(ctx, workload.Nodename, func(ctx context.Context, node *types.Node) error { + return c.withWorkloadLocked(ctx, opts.ID, func(ctx context.Context, workload *types.Workload) error { + rrs, err := resources.MakeRequests( + types.ResourceOptions{ + CPUQuotaRequest: utils.Round(workload.CPUQuotaRequest + opts.ResourceOpts.CPUQuotaRequest), + CPUQuotaLimit: utils.Round(workload.CPUQuotaLimit + opts.ResourceOpts.CPUQuotaLimit), + CPUBind: types.ParseTriOption(opts.CPUBindOpts, len(workload.CPU) > 0), + CPU: workload.CPU, + MemoryRequest: workload.MemoryRequest + opts.ResourceOpts.MemoryRequest, + MemoryLimit: workload.MemoryLimit + opts.ResourceOpts.MemoryLimit, + StorageRequest: workload.StorageRequest + opts.ResourceOpts.StorageRequest, + StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit, + VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest), + VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit), + VolumeExist: workload.VolumePlanRequest, + }, + ) + if err != nil { + return logger.Err(ctx, err) + } + return logger.Err(ctx, c.doReallocOnNode(ctx, node, workload, rrs)) + }) }) } // transaction: node resource -func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload *types.Workload, rrs resourcetypes.ResourceRequests) error { - return c.withNodeLocked(ctx, nodename, func(ctx context.Context, node *types.Node) (err error) { - node.RecycleResources(&workload.ResourceMeta) - plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node}) - if err != nil { - return err - } +func (c *Calcium) doReallocOnNode(ctx context.Context, node *types.Node, workload *types.Workload, rrs resourcetypes.ResourceRequests) (err error) { + node.RecycleResources(&workload.ResourceMeta) + plans, err := resources.SelectNodesByResourceRequests(ctx, rrs, map[string]*types.Node{node.Name: node}) + if err != nil { + return err + } - originalWorkload := *workload - resourceMeta := &types.ResourceMeta{} - if err = utils.Txn( - ctx, + originalWorkload := *workload + resourceMeta := &types.ResourceMeta{} + if err = utils.Txn( + ctx, - // if update workload resources - func(ctx context.Context) (err error) { - resourceMeta := &types.ResourceMeta{} - for _, plan := range plans { - if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{ - Node: node, - }, resourceMeta); err != nil { - return err - } + // if update workload resources + func(ctx context.Context) (err error) { + resourceMeta := &types.ResourceMeta{} + for _, plan := range plans { + if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{ + Node: node, + }, resourceMeta); err != nil { + return err } + } - return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload) - }, - // then commit changes - func(ctx context.Context) error { - for _, plan := range plans { - plan.ApplyChangesOnNode(node, 0) - } - return errors.WithStack(c.store.UpdateNodes(ctx, node)) - }, - // no need rollback - func(ctx context.Context, failureByCond bool) (err error) { - if failureByCond { - return - } - r := &types.ResourceMeta{ - CPUQuotaRequest: originalWorkload.CPUQuotaRequest, - CPUQuotaLimit: originalWorkload.CPUQuotaLimit, - CPU: originalWorkload.CPU, - NUMANode: originalWorkload.NUMANode, - MemoryRequest: originalWorkload.MemoryRequest, - MemoryLimit: originalWorkload.MemoryLimit, - VolumeRequest: originalWorkload.VolumeRequest, - VolumeLimit: originalWorkload.VolumeLimit, - VolumePlanRequest: originalWorkload.VolumePlanRequest, - VolumePlanLimit: originalWorkload.VolumePlanLimit, - VolumeChanged: resourceMeta.VolumeChanged, - StorageRequest: originalWorkload.StorageRequest, - StorageLimit: originalWorkload.StorageLimit, - } - return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload) - }, + return c.doReallocWorkloadsOnInstance(ctx, node.Engine, resourceMeta, workload) + }, + // then commit changes + func(ctx context.Context) error { + for _, plan := range plans { + plan.ApplyChangesOnNode(node, 0) + } + return errors.WithStack(c.store.UpdateNodes(ctx, node)) + }, + // no need rollback + func(ctx context.Context, failureByCond bool) (err error) { + if failureByCond { + return + } + r := &types.ResourceMeta{ + CPUQuotaRequest: originalWorkload.CPUQuotaRequest, + CPUQuotaLimit: originalWorkload.CPUQuotaLimit, + CPU: originalWorkload.CPU, + NUMANode: originalWorkload.NUMANode, + MemoryRequest: originalWorkload.MemoryRequest, + MemoryLimit: originalWorkload.MemoryLimit, + VolumeRequest: originalWorkload.VolumeRequest, + VolumeLimit: originalWorkload.VolumeLimit, + VolumePlanRequest: originalWorkload.VolumePlanRequest, + VolumePlanLimit: originalWorkload.VolumePlanLimit, + VolumeChanged: resourceMeta.VolumeChanged, + StorageRequest: originalWorkload.StorageRequest, + StorageLimit: originalWorkload.StorageLimit, + } + return c.doReallocWorkloadsOnInstance(ctx, node.Engine, r, workload) + }, - c.config.GlobalTimeout, - ); err != nil { - return - } + c.config.GlobalTimeout, + ); err != nil { + return + } - c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node) - return nil - }) + c.doRemapResourceAndLog(ctx, log.WithField("Calcium", "doReallocOnNode"), node) + return nil } func (c *Calcium) doReallocWorkloadsOnInstance(ctx context.Context, engine engine.API, resourceMeta *types.ResourceMeta, workload *types.Workload) (err error) { diff --git a/cluster/calcium/realloc_test.go b/cluster/calcium/realloc_test.go index 7bd940af7..62e22cac3 100644 --- a/cluster/calcium/realloc_test.go +++ b/cluster/calcium/realloc_test.go @@ -97,27 +97,29 @@ func TestRealloc(t *testing.T) { } } - store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil) + store.On("GetWorkload", mock.Anything, "c1").Return(newC1(context.TODO(), nil)[0], nil) + + // failed by GetNode + store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once() + err := c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) + assert.EqualError(t, err, "ETCD must be set") + store.AssertExpectations(t) + store.On("GetNode", mock.Anything, "node1").Return(node1, nil) + // failed by lock store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() - err := c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) + err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) assert.EqualError(t, err, "ETCD must be set") store.AssertExpectations(t) store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) // failed by newCPU < 0 + store.On("GetWorkloads", mock.Anything, []string{"c1"}).Return(newC1, nil) err = c.ReallocResource(ctx, newReallocOptions("c1", -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) assert.EqualError(t, err, "limit or request less than 0: bad `CPU` value") store.AssertExpectations(t) - // failed by GetNode - store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once() - err = c.ReallocResource(ctx, newReallocOptions("c1", 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep)) - assert.EqualError(t, err, "ETCD must be set") - store.AssertExpectations(t) - // failed by no enough mem - store.On("GetNode", mock.Anything, "node1").Return(node1, nil) simpleMockScheduler := &schedulermocks.Scheduler{} scheduler.InitSchedulerV1(simpleMockScheduler) c.scheduler = simpleMockScheduler @@ -165,6 +167,7 @@ func TestRealloc(t *testing.T) { }, Engine: engine, } + store.On("GetWorkload", mock.Anything, "c2").Return(newC2(nil, nil)[0], nil) store.On("GetWorkloads", mock.Anything, []string{"c2"}).Return(newC2, nil) err = c.ReallocResource(ctx, newReallocOptions("c2", 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep)) assert.EqualError(t, err, "workload ID must be length of 64") @@ -251,6 +254,7 @@ func TestRealloc(t *testing.T) { simpleMockScheduler.On("ReselectCPUNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeCPUPlans, 2, nil) simpleMockScheduler.On("ReselectVolumeNodes", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(scheduleInfos[0], nodeVolumePlans, 2, nil).Once() store.On("GetNode", mock.Anything, "node2").Return(node2, nil) + store.On("GetWorkload", mock.Anything, "c3").Return(c3, nil) store.On("GetWorkloads", mock.Anything, []string{"c3"}).Return([]*types.Workload{c3}, nil) store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(types.ErrBadWorkloadID).Times(1) err = c.ReallocResource(ctx, newReallocOptions("c3", 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}), types.TriKeep, types.TriKeep)) @@ -341,6 +345,7 @@ func TestReallocBindCpu(t *testing.T) { } store.On("GetNode", mock.Anything, "node3").Return(node3, nil) + store.On("GetWorkload", mock.Anything, "c5").Return(c5, nil) store.On("GetWorkloads", mock.Anything, []string{"c5"}).Return([]*types.Workload{c5}, nil) engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil) store.On("UpdateWorkload", mock.Anything, mock.Anything).Return(nil) @@ -358,6 +363,7 @@ func TestReallocBindCpu(t *testing.T) { assert.Equal(t, 0, len(c5.ResourceMeta.CPU)) store.AssertExpectations(t) + store.On("GetWorkload", mock.Anything, "c6").Return(c6, nil) store.On("GetWorkloads", mock.Anything, []string{"c6"}).Return([]*types.Workload{c6}, nil) err = c.ReallocResource(ctx, newReallocOptions("c6", 0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep)) assert.NoError(t, err) diff --git a/scheduler/complex/potassium.go b/scheduler/complex/potassium.go index 78d494fd4..8eb88193d 100644 --- a/scheduler/complex/potassium.go +++ b/scheduler/complex/potassium.go @@ -124,6 +124,7 @@ func (m *Potassium) SelectCPUNodes(ctx context.Context, scheduleInfos []resource // ReselectCPUNodes used for realloc one container with cpu affinity func (m *Potassium) ReselectCPUNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, CPU types.CPUMap, quota float64, memory int64) (resourcetypes.ScheduleInfo, map[string][]types.CPUMap, int, error) { + log.Infof(ctx, "[SelectCPUNodes] scheduleInfo %v, need cpu %f, need memory %d, existing %v", scheduleInfo, quota, memory, CPU) var affinityPlan types.CPUMap // remaining quota that's impossible to achieve affinity if scheduleInfo, quota, affinityPlan = cpuReallocPlan(scheduleInfo, quota, CPU, int64(m.sharebase)); quota == 0 { @@ -325,7 +326,7 @@ func (m *Potassium) SelectVolumeNodes(ctx context.Context, scheduleInfos []resou // ReselectVolumeNodes is used for realloc only func (m *Potassium) ReselectVolumeNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, existing types.VolumePlan, vbsReq types.VolumeBindings) (resourcetypes.ScheduleInfo, map[string][]types.VolumePlan, int, error) { - + log.Infof(ctx, "[ReselectVolumeNodes] scheduleInfo %v, need volume: %v, existing %v", scheduleInfo, vbsReq.ToStringSlice(true, true), existing.ToLiteral()) affinityPlan := types.VolumePlan{} needReschedule := types.VolumeBindings{} norm, mono, unlim := distinguishVolumeBindings(vbsReq) diff --git a/utils/utils.go b/utils/utils.go index 2f6e381c6..71081a45d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "math/big" "os" + "reflect" "strings" "github.com/pkg/errors" @@ -285,3 +286,29 @@ func safeSplit(s string) []string { return result } + +// Reverse any slice +func Reverse(s interface{}) { + n := reflect.ValueOf(s).Len() + swap := reflect.Swapper(s) + for i, j := 0, n-1; i < j; i, j = i+1, j-1 { + swap(i, j) + } +} + +// Unique return a index, where s[:index] is a unique slice +// Unique requires sorted slice +func Unique(s interface{}, getVal func(int) string) (j int) { + n := reflect.ValueOf(s).Len() + swap := reflect.Swapper(s) + lastVal := "" + for i := 0; i < n; i++ { + if getVal(i) == lastVal && i != 0 { + continue + } + lastVal = getVal(i) + swap(i, j) + j++ + } + return j +} diff --git a/utils/utils_test.go b/utils/utils_test.go index e149b39a3..c25c92f73 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "os" + "reflect" "strconv" "testing" @@ -232,3 +233,30 @@ func TestRange(t *testing.T) { assert.Equal(t, i, res[i]) } } + +func TestReverse(t *testing.T) { + s1 := []string{"a", "b", "c"} + Reverse(s1) + assert.True(t, reflect.DeepEqual(s1, []string{"c", "b", "a"})) + + s2 := []string{} + Reverse(s2) + + s3 := []int{1, 2, 3, 4} + Reverse(s3) + assert.True(t, reflect.DeepEqual(s3, []int{4, 3, 2, 1})) +} + +func TestUnique(t *testing.T) { + s1 := []int64{1, 2, 3} + s1 = s1[:Unique(s1, func(i int) string { return strconv.Itoa(int(s1[i])) })] + assert.True(t, reflect.DeepEqual(s1, []int64{1, 2, 3})) + + s2 := []string{"a", "a", "a", "b", "b", "c"} + s2 = s2[:Unique(s2, func(i int) string { return s2[i] })] + assert.True(t, reflect.DeepEqual(s2, []string{"a", "b", "c"})) + + s3 := []string{"", "1", "1", "1", "1"} + s3 = s3[:Unique(s3, func(i int) string { return s3[i] })] + assert.True(t, reflect.DeepEqual(s3, []string{"", "1"})) +}