Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize global strategy #545

Merged
merged 1 commit into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 56 additions & 35 deletions strategy/global.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,47 @@
package strategy

import (
"container/heap"
"context"
"fmt"
"sort"

"github.com/pkg/errors"

"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"

"github.com/pkg/errors"
)

type infoHeapForGlobalStrategy []Info

// Len .
func (r infoHeapForGlobalStrategy) Len() int {
return len(r)
}

// Less .
func (r infoHeapForGlobalStrategy) Less(i, j int) bool {
return (r[i].Usage + r[i].Rate) < (r[j].Usage + r[j].Rate)
}

// Swap .
func (r infoHeapForGlobalStrategy) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}

// Push .
func (r *infoHeapForGlobalStrategy) Push(x interface{}) {
*r = append(*r, x.(Info))
}

// Pop .
func (r *infoHeapForGlobalStrategy) Pop() interface{} {
old := *r
n := len(old)
x := old[n-1]
*r = old[:n-1]
return x
}

// GlobalPlan 基于全局资源配额
// 尽量使得资源消耗平均
func GlobalPlan(ctx context.Context, infos []Info, need, total, _ int) (map[string]int, error) {
Expand All @@ -21,40 +51,31 @@ func GlobalPlan(ctx context.Context, infos []Info, need, total, _ int) (map[stri
}
strategyInfos := make([]Info, len(infos))
copy(strategyInfos, infos)
sort.Slice(infos, func(i, j int) bool { return infos[i].Capacity > infos[j].Capacity })
length := len(strategyInfos)
i := 0

deployMap := make(map[string]int)
for need > 0 {
p := i
deploy := 0
delta := 0.0
if i < length-1 {
delta = utils.Round(strategyInfos[i+1].Usage - strategyInfos[i].Usage)
i++
deployMap := map[string]int{}

infoHeap := &infoHeapForGlobalStrategy{}
for _, info := range strategyInfos {
if info.Capacity > 0 {
infoHeap.Push(info)
}
for j := 0; j <= p && need > 0 && delta >= 0; j++ {
// 减枝
if strategyInfos[j].Capacity == 0 {
continue
}
cost := utils.Round(strategyInfos[j].Rate)
deploy = int(delta / cost)
if deploy == 0 {
deploy = 1
}
if deploy > strategyInfos[j].Capacity {
deploy = strategyInfos[j].Capacity
}
if deploy > need {
deploy = need
}
strategyInfos[j].Capacity -= deploy
deployMap[strategyInfos[j].Nodename] += deploy
need -= deploy
}
heap.Init(infoHeap)

for i := 0; i < need; i++ {
if infoHeap.Len() == 0 {
return nil, errors.WithStack(types.NewDetailedErr(types.ErrInsufficientRes,
fmt.Sprintf("need: %d, available: %d", need, i)))
}
infoWithMinUsage := heap.Pop(infoHeap).(Info)
deployMap[infoWithMinUsage.Nodename]++
infoWithMinUsage.Usage += infoWithMinUsage.Rate
infoWithMinUsage.Capacity--
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个确认一下会不会修改入参 infos 里的值, strategy 函数都应该是无副作用的, 而且可能会多次调用, 不应该在运行之后修改入参.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里因为上面有个copy,所以是无副作用的。(不过有个sort.Slice,会改变Info的顺序)

CommunismPlan里没有直接调用copy,但是在初始化heap的时候事实上相当于copy了一下,所以也是无副作用的。

简单看了一下历史版本,在去年1月份之前的版本里可能会产生副作用,不知道当时的情况是在哪个版本发生的...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在什么情况下会多次调用呢?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

历史上有个两次调用的版本, 不过后来删掉了.


if infoWithMinUsage.Capacity > 0 {
heap.Push(infoHeap, infoWithMinUsage)
}
}

// 这里 need 一定会为 0 出来,因为 volTotal 保证了一定大于 need
// 这里并不需要再次排序了,理论上的排序是基于资源使用率得到的 Deploy 最终方案
log.Debugf(ctx, "[GlobalPlan] strategyInfos: %v", strategyInfos)
Expand Down
133 changes: 112 additions & 21 deletions strategy/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/projecteru2/core/types"
)

func TestGlobalPlan1(t *testing.T) {
// normal case
n1 := Info{
Nodename: "n1",
Usage: 0.8,
Expand All @@ -17,64 +20,152 @@ func TestGlobalPlan1(t *testing.T) {
n2 := Info{
Nodename: "n2",
Usage: 0.5,
Rate: 0.11,
Capacity: 1,
Rate: 0.12,
Capacity: 2,
}
n3 := Info{
Nodename: "n3",
Usage: 2.2,
Rate: 0.05,
Capacity: 1,
}
arg := []Info{n3, n2, n1}
arg := []Info{n1, n2, n3}
r, err := GlobalPlan(context.TODO(), arg, 3, 100, 0)
assert.NoError(t, err)
assert.Equal(t, r[arg[0].Nodename], 1)
}
assert.Equal(t, r, map[string]int{"n1": 1, "n2": 2})

func TestGlobalPlan2(t *testing.T) {
n1 := Info{
// normal case 2
n1 = Info{
Nodename: "n1",
Usage: 0.8,
Rate: 0.05,
Capacity: 4,
}
n2 = Info{
Nodename: "n2",
Usage: 0.5,
Rate: 0.35,
Capacity: 1,
}
n3 = Info{
Nodename: "n3",
Usage: 2.2,
Rate: 0.05,
Capacity: 1,
}
arg = []Info{n1, n2, n3}
r, err = GlobalPlan(context.TODO(), arg, 3, 100, 0)
assert.Equal(t, r, map[string]int{"n1": 2, "n2": 1})

// insufficient total
n1 = Info{
Nodename: "n1",
Usage: 0.8,
Rate: 0.05,
Capacity: 4,
}
n2 = Info{
Nodename: "n2",
Usage: 0.5,
Rate: 0.35,
Capacity: 1,
}
n3 = Info{
Nodename: "n3",
Usage: 2.2,
Rate: 0.05,
Capacity: 1,
}
arg = []Info{n1, n2, n3}
r, err = GlobalPlan(context.TODO(), arg, 100, 6, 0)
assert.ErrorIs(t, err, types.ErrInsufficientRes)

// fake total
n1 = Info{
Nodename: "n1",
Usage: 0.8,
Rate: 0.05,
Capacity: 4,
}
n2 = Info{
Nodename: "n2",
Usage: 0.5,
Rate: 0.35,
Capacity: 1,
}
n3 = Info{
Nodename: "n3",
Usage: 2.2,
Rate: 0.05,
Capacity: 1,
}
arg = []Info{n1, n2, n3}
r, err = GlobalPlan(context.TODO(), arg, 10, 100, 0)
assert.ErrorIs(t, err, types.ErrInsufficientRes)

// small rate
n1 = Info{
Nodename: "n1",
Usage: 0.8,
Rate: 0,
Capacity: 1e10,
}
n2 = Info{
Nodename: "n2",
Usage: 0.5,
Rate: 0,
Capacity: 1e10,
}
n3 = Info{
Nodename: "n3",
Usage: 2.2,
Rate: 0,
Capacity: 1e10,
}
arg = []Info{n1, n2, n3}
r, err = GlobalPlan(context.TODO(), arg, 10, 100, 0)
assert.NoError(t, err)
assert.Equal(t, r, map[string]int{"n2": 10})

// old test case 2
n1 = Info{
Nodename: "n1",
Usage: 1.6,
Rate: 0.05,
Capacity: 100,
}
n2 := Info{
n2 = Info{
Nodename: "n2",
Usage: 0.5,
Rate: 0.11,
Capacity: 100,
}
arg := []Info{n2, n1}
r, err := GlobalPlan(context.TODO(), arg, 2, 100, 0)
arg = []Info{n2, n1}
r, err = GlobalPlan(context.TODO(), arg, 2, 100, 0)
assert.NoError(t, err)
assert.Equal(t, r[arg[0].Nodename], 2)
}
assert.Equal(t, r, map[string]int{"n2": 2})

func TestGlobalPlan3(t *testing.T) {
n1 := Info{
// old test case 3
n1 = Info{
Nodename: "n1",
Usage: 0.5259232954545454,
Rate: 0.0000712,
Capacity: 100,
}

r, err := GlobalPlan(context.TODO(), []Info{n1}, 1, 100, 0)
r, err = GlobalPlan(context.TODO(), []Info{n1}, 1, 100, 0)
assert.NoError(t, err)
assert.Equal(t, r["n1"], 1)
}

func TestGlobal3(t *testing.T) {
_, err := GlobalPlan(context.TODO(), []Info{}, 10, 1, 0)
assert.Error(t, err)
nodeInfo := Info{
// old test case 4
n1 = Info{
Nodename: "n1",
Usage: 1,
Rate: 0.3,
Capacity: 100,
Count: 21,
}
r, err := GlobalPlan(context.TODO(), []Info{nodeInfo}, 10, 100, 0)
r, err = GlobalPlan(context.TODO(), []Info{n1}, 10, 100, 0)
assert.NoError(t, err)
assert.Equal(t, r["n1"], 10)
}
Expand Down