Skip to content

Commit

Permalink
improve scheduler logs (#446)
Browse files Browse the repository at this point in the history
* simplify scheduler logs

* add more specific scheduler error info

* improve strategy error info

* fix resource check calculation for monopoly volume

* adjust unittest to the new errors
  • Loading branch information
jschwinger233 authored Jun 30, 2021
1 parent a598f71 commit c927988
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 31 deletions.
3 changes: 3 additions & 0 deletions cluster/calcium/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,8 @@ func (c *Calcium) doCalculateCapacity(ctx context.Context, nodeMap map[string]*t
total += info.Capacity
}
log.Debugf(ctx, "[Calcium.doCalculateCapacity] plans: %+v, total: %v", plans, total)
if total <= 0 {
return 0, nil, nil, errors.Wrap(types.ErrInsufficientRes, "no node meets all the resource requirements at the same time")
}
return
}
6 changes: 2 additions & 4 deletions cluster/calcium/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ func TestCalculateCapacity(t *testing.T) {
Capacity: 1,
}}, nil, 1, nil).Once()
r, err = c.CalculateCapacity(ctx, opts)
assert.NoError(t, err)
assert.EqualValues(t, 0, r.Total)
assert.Error(t, err, "no node meets all the resource requirements at the same time")
sched.AssertExpectations(t)
store.AssertExpectations(t)

Expand All @@ -114,8 +113,7 @@ func TestCalculateCapacity(t *testing.T) {
Capacity: 1,
}}, nil, 1, nil).Once()
r, err = c.CalculateCapacity(ctx, opts)
assert.NoError(t, err)
assert.EqualValues(t, 0, r.Total)
assert.Error(t, err, "no node meets all the resource requirements at the same time")
sched.AssertExpectations(t)
store.AssertExpectations(t)
}
3 changes: 2 additions & 1 deletion cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func (c *Calcium) doGetNodeResource(ctx context.Context, nodename string, fix bo
}
}
for vol, ids := range monopolyVolumeByWorkloads {
if len(ids) > 1 {
idx := utils.Unique(ids, func(i int) string { return ids[i] })
if len(ids[:idx]) > 1 {
nr.Diffs = append(nr.Diffs, fmt.Sprintf("\tmonopoly volume used by multiple workloads: %s, %+v", vol, ids))
}
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/complex/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func cpuPriorPlan(ctx context.Context, cpu float64, memory int64, scheduleInfos
sort.Slice(scheduleInfos, func(i, j int) bool { return scheduleInfos[i].Capacity < scheduleInfos[j].Capacity })
p := sort.Search(len(scheduleInfos), func(i int) bool { return scheduleInfos[i].Capacity > 0 })
if p == len(scheduleInfos) {
return nil, nil, 0, errors.WithStack(types.ErrInsufficientRes)
return nil, nil, 0, errors.Wrapf(types.ErrInsufficientRes, "no node remains %.2f pieces of cpu and %d bytes of memory at the same time", cpu, memory)
}

return scheduleInfos[p:], nodeWorkload, volTotal, nil
Expand Down
2 changes: 1 addition & 1 deletion scheduler/complex/cpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestCPUReallocWithPriorPlan(t *testing.T) {
"2": 40,
}
_, _, _, err = po.ReselectCPUNodes(context.TODO(), scheduleInfo, CPU, 2, 0)
assert.EqualError(t, err, "not enough resource")
assert.EqualError(t, err, "failed to reschedule cpu: no node remains 1.00 pieces of cpu and 0 bytes of memory at the same time: not enough resource")
}

func TestGetFullResult(t *testing.T) {
Expand Down
81 changes: 69 additions & 12 deletions scheduler/complex/potassium.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,25 @@ func (m *Potassium) SelectStorageNodes(ctx context.Context, scheduleInfos []reso
case storage == 0:
return scheduleInfos, math.MaxInt64, nil
default:
log.Infof(ctx, "[SelectStorageNodes] scheduleInfos: %v, need: %d", scheduleInfos, storage)
storages := []struct {
Nodename string
Storage int64
}{}
for _, scheduleInfo := range scheduleInfos {
storages = append(storages, struct {
Nodename string
Storage int64
}{scheduleInfo.Name, scheduleInfo.StorageCap})
}
log.Infof(ctx, "[SelectStorageNodes] resources: %v, need: %d", storages, storage)
}

leng := len(scheduleInfos)

sort.Slice(scheduleInfos, func(i, j int) bool { return scheduleInfos[i].StorageCap < scheduleInfos[j].StorageCap })
p := sort.Search(leng, func(i int) bool { return scheduleInfos[i].StorageCap >= storage })
if p == leng {
return nil, 0, errors.WithStack(types.ErrInsufficientStorage)
return nil, 0, errors.Wrapf(types.ErrInsufficientStorage, "no node remains storage more than %d bytes", storage)
}

scheduleInfos = scheduleInfos[p:]
Expand All @@ -73,7 +83,19 @@ func (m *Potassium) SelectStorageNodes(ctx context.Context, scheduleInfos []reso

// SelectMemoryNodes filter nodes with enough memory
func (m *Potassium) SelectMemoryNodes(ctx context.Context, scheduleInfos []resourcetypes.ScheduleInfo, quota float64, memory int64) ([]resourcetypes.ScheduleInfo, int, error) {
log.Infof(ctx, "[SelectMemoryNodes] scheduleInfos: %v, need cpu: %f, memory: %d", scheduleInfos, quota, memory)
resources := []struct {
Nodename string
CPU types.CPUMap
Memory int64
}{}
for _, scheduleInfo := range scheduleInfos {
resources = append(resources, struct {
Nodename string
CPU types.CPUMap
Memory int64
}{scheduleInfo.Name, scheduleInfo.CPU, scheduleInfo.MemCap})
}
log.Infof(ctx, "[SelectMemoryNodes] resources: %v, need cpu: %f, memory: %d", resources, quota, memory)
scheduleInfosLength := len(scheduleInfos)

// 筛选出能满足 CPU 需求的
Expand All @@ -83,7 +105,7 @@ func (m *Potassium) SelectMemoryNodes(ctx context.Context, scheduleInfos []resou
})
// p 最大也就是 scheduleInfosLength - 1
if p == scheduleInfosLength {
return nil, 0, errors.WithStack(types.ErrInsufficientCPU)
return nil, 0, errors.Wrapf(types.ErrInsufficientCPU, "no node remains cpu more than %0.2f", quota)
}
scheduleInfosLength -= p
scheduleInfos = scheduleInfos[p:]
Expand All @@ -92,7 +114,7 @@ func (m *Potassium) SelectMemoryNodes(ctx context.Context, scheduleInfos []resou
sort.Slice(scheduleInfos, func(i, j int) bool { return scheduleInfos[i].MemCap < scheduleInfos[j].MemCap })
p = sort.Search(scheduleInfosLength, func(i int) bool { return scheduleInfos[i].MemCap >= memory })
if p == scheduleInfosLength {
return nil, 0, errors.WithStack(types.ErrInsufficientMEM)
return nil, 0, errors.Wrapf(types.ErrInsufficientMEM, "no node remains memory more than %d bytes", memory)
}
scheduleInfos = scheduleInfos[p:]

Expand All @@ -111,7 +133,19 @@ func (m *Potassium) SelectMemoryNodes(ctx context.Context, scheduleInfos []resou

// SelectCPUNodes select nodes with enough cpus
func (m *Potassium) SelectCPUNodes(ctx context.Context, scheduleInfos []resourcetypes.ScheduleInfo, quota float64, memory int64) ([]resourcetypes.ScheduleInfo, map[string][]types.CPUMap, int, error) {
log.Infof(ctx, "[SelectCPUNodes] scheduleInfos %d, need cpu: %f memory: %d", len(scheduleInfos), quota, memory)
resources := []struct {
Nodename string
Memory int64
CPU types.CPUMap
}{}
for _, scheduleInfo := range scheduleInfos {
resources = append(resources, struct {
Nodename string
Memory int64
CPU types.CPUMap
}{scheduleInfo.Name, scheduleInfo.MemCap, scheduleInfo.CPU})
}
log.Infof(ctx, "[SelectCPUNodes] resources %v, need cpu: %f memory: %d", resources, quota, memory)
if quota <= 0 {
return nil, nil, 0, errors.WithStack(types.ErrNegativeQuota)
}
Expand All @@ -124,7 +158,13 @@ func (m *Potassium) SelectCPUNodes(ctx context.Context, scheduleInfos []resource

// ReselectCPUNodes used for realloc one container with cpu affinity
func (m *Potassium) ReselectCPUNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, CPU types.CPUMap, quota float64, memory int64) (resourcetypes.ScheduleInfo, map[string][]types.CPUMap, int, error) {
log.Infof(ctx, "[SelectCPUNodes] scheduleInfo %v, need cpu %f, need memory %d, existing %v", scheduleInfo, quota, memory, CPU)
log.Infof(ctx, "[ReselectCPUNodes] resources %v, need cpu %f, need memory %d, existing %v",
struct {
Nodename string
Memory int64
CPU types.CPUMap
}{scheduleInfo.Name, scheduleInfo.MemCap, scheduleInfo.CPU},
quota, memory, CPU)
var affinityPlan types.CPUMap
// remaining quota that's impossible to achieve affinity
if scheduleInfo, quota, affinityPlan = cpuReallocPlan(scheduleInfo, quota, CPU, int64(m.sharebase)); quota == 0 {
Expand All @@ -139,7 +179,7 @@ func (m *Potassium) ReselectCPUNodes(ctx context.Context, scheduleInfo resourcet

scheduleInfos, cpuPlans, total, err := m.SelectCPUNodes(ctx, []resourcetypes.ScheduleInfo{scheduleInfo}, quota, memory)
if err != nil {
return scheduleInfo, nil, 0, err
return scheduleInfo, nil, 0, errors.Wrap(err, "failed to reschedule cpu")
}

// add affinity plans
Expand Down Expand Up @@ -218,7 +258,18 @@ func cpuReallocPlan(scheduleInfo resourcetypes.ScheduleInfo, quota float64, CPU

// SelectVolumeNodes calculates plans for volume request
func (m *Potassium) SelectVolumeNodes(ctx context.Context, scheduleInfos []resourcetypes.ScheduleInfo, vbs types.VolumeBindings) ([]resourcetypes.ScheduleInfo, map[string][]types.VolumePlan, int, error) {
log.Infof(ctx, "[SelectVolumeNodes] scheduleInfos %v, need volume: %v", scheduleInfos, vbs.ToStringSlice(true, true))
resources := []struct {
Nodename string
Volume types.VolumeMap
}{}
for _, scheduleInfo := range scheduleInfos {
resources = append(resources, struct {
Nodename string
Volume types.VolumeMap
}{scheduleInfo.Name, scheduleInfo.Volume})
}
log.Infof(ctx, "[SelectVolumeNodes] resources %v, need volume: %v", resources, vbs.ToStringSlice(true, true))

var reqsNorm, reqsMono []int64
var vbsNorm, vbsMono, vbsUnlimited types.VolumeBindings

Expand Down Expand Up @@ -318,15 +369,21 @@ func (m *Potassium) SelectVolumeNodes(ctx context.Context, scheduleInfos []resou
sort.Slice(scheduleInfos, func(i, j int) bool { return scheduleInfos[i].Capacity < scheduleInfos[j].Capacity })
p := sort.Search(len(scheduleInfos), func(i int) bool { return scheduleInfos[i].Capacity > 0 })
if p == len(scheduleInfos) {
return nil, nil, 0, errors.WithStack(types.ErrInsufficientRes)
return nil, nil, 0, errors.Wrapf(types.ErrInsufficientRes, "no node remains volumes for requests %+v", vbs.ToStringSlice(true, true))
}

return scheduleInfos[p:], volumePlans, volTotal, nil
}

// ReselectVolumeNodes is used for realloc only
func (m *Potassium) ReselectVolumeNodes(ctx context.Context, scheduleInfo resourcetypes.ScheduleInfo, existing types.VolumePlan, vbsReq types.VolumeBindings) (resourcetypes.ScheduleInfo, map[string][]types.VolumePlan, int, error) {
log.Infof(ctx, "[ReselectVolumeNodes] scheduleInfo %v, need volume: %v, existing %v", scheduleInfo, vbsReq.ToStringSlice(true, true), existing.ToLiteral())
log.Infof(ctx, "[ReselectVolumeNodes] resources: %v, need volume: %v, existing %v",
struct {
Nodename string
Volume types.VolumeMap
InitVolume types.VolumeMap
}{scheduleInfo.Name, scheduleInfo.Volume, scheduleInfo.InitVolume},
vbsReq.ToStringSlice(true, true), existing.ToLiteral())
affinityPlan := types.VolumePlan{}
needReschedule := types.VolumeBindings{}
norm, mono, unlim := distinguishVolumeBindings(vbsReq)
Expand Down Expand Up @@ -388,7 +445,7 @@ func (m *Potassium) ReselectVolumeNodes(ctx context.Context, scheduleInfo resour
}
scheduleInfos, volumePlans, total, err := m.SelectVolumeNodes(ctx, []resourcetypes.ScheduleInfo{scheduleInfo}, needReschedule)
if err != nil {
return scheduleInfo, nil, 0, errors.WithMessage(err, "failed to reschedule")
return scheduleInfo, nil, 0, errors.Wrap(err, "failed to reschedule volume")
}

// merge
Expand Down
9 changes: 8 additions & 1 deletion scheduler/complex/potassium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func TestSelectCPUNodesWithMemoryLimit(t *testing.T) {
// 测试 2 个 Node,内存不足
nodes = generateNodes(2, 2, 1024, 0, 10)
_, _, err = SelectCPUNodes(k, nodes, nil, 0.1, 1025, 1, true)
assert.EqualError(t, err, types.ErrInsufficientRes.Error())
assert.EqualError(t, err, "no node remains 0.10 pieces of cpu and 1025 bytes of memory at the same time: not enough resource")

// 测试 need 超过 each node 的 capacity
nodes = generateNodes(2, 2, 1024, 0, 10)
Expand Down Expand Up @@ -1794,6 +1794,13 @@ func TestSelectVolumeNormAndMono(t *testing.T) {
"AUTO:/data1:rwm:50": map[string]int64{
"/data1": 2000,
},
})) || reflect.DeepEqual(res["n0"][0], types.MustToVolumePlan(map[string]map[string]int64{
"AUTO:/data0:rw:50": map[string]int64{
"/data1": 50,
},
"AUTO:/data1:rwm:50": map[string]int64{
"/data0": 1000,
},
})))

// round 2
Expand Down
3 changes: 1 addition & 2 deletions store/etcdv3/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ func (m *Mercury) doLoadProcessing(ctx context.Context, appname, entryname strin
nodesCount[nodename] += count
}

log.Debug(ctx, "[doLoadProcessing] Processing result:")
litter.Dump(nodesCount)
log.Debug(ctx, "[doLoadProcessing] Processing result: %s", litter.Sdump())
setCount(nodesCount, strategyInfos)
return nil
}
2 changes: 1 addition & 1 deletion strategy/average.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func AveragePlan(ctx context.Context, infos []Info, need, total, limit int) (map
return nil, errors.WithStack(types.NewDetailedErr(types.ErrInsufficientCap, "insufficient nodes, at least 1 needed"))
}
if p < limit {
return nil, types.NewDetailedErr(types.ErrInsufficientRes, fmt.Sprintf("insufficient nodes, %d more needed", limit-p))
return nil, types.NewDetailedErr(types.ErrInsufficientRes, fmt.Sprintf("not enough nodes with capacity of %d, require %d nodes", need, limit))
}
deployMap := map[string]int{}
for _, strategyInfo := range infos[:limit] {
Expand Down
4 changes: 2 additions & 2 deletions strategy/average_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func TestAveragePlan(t *testing.T) {

nodes = genNodesByCapCount([]int{1, 2, 3, 4, 5}, []int{3, 3, 3, 3, 3})
_, err = AveragePlan(context.TODO(), nodes, 4, 100, 4)
assert.EqualError(t, err, "not enough resource: insufficient nodes, 2 more needed")
assert.EqualError(t, err, "not enough resource: not enough nodes with capacity of 4, require 4 nodes")

nodes = genNodesByCapCount([]int{1, 2, 3, 4, 5}, []int{3, 3, 3, 3, 3})
_, err = AveragePlan(context.TODO(), nodes, 2, 100, 0)
assert.EqualError(t, err, "not enough resource: insufficient nodes, 1 more needed")
assert.EqualError(t, err, "not enough resource: not enough nodes with capacity of 2, require 5 nodes")
}
2 changes: 1 addition & 1 deletion strategy/communism.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func CommunismPlan(ctx context.Context, infos []Info, need, total, limit int) (m
heap.Init(iHeap)
for {
if iHeap.Len() == 0 {
return nil, errors.WithStack(types.ErrInsufficientRes)
return nil, errors.Wrapf(types.ErrInsufficientRes, "reached nodelimit, a node can host at most %d instances", limit)
}
info := heap.Pop(iHeap).(Info)
deploy[info.Nodename]++
Expand Down
2 changes: 1 addition & 1 deletion strategy/communism_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestCommunismPlanCapacityPriority(t *testing.T) {
// test limit
nodes = genNodesByCapCount([]int{3, 4, 5, 10}, []int{3, 5, 7, 10})
deploy, err = CommunismPlan(context.TODO(), nodes, 3, 10, 5)
assert.EqualError(t, err, "not enough resource")
assert.EqualError(t, err, "reached nodelimit, a node can host at most 5 instances: not enough resource")
deploy, err = CommunismPlan(context.TODO(), nodes, 3, 10, 6)
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion strategy/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ func FillPlan(ctx context.Context, infos []Info, need, _, limit int) (_ map[stri
}
}
return nil, errors.WithStack(types.NewDetailedErr(types.ErrInsufficientRes,
fmt.Sprintf("insufficient nodes to fill %d, %d more nodes needed", need, limit)))
fmt.Sprintf("not enough nodes that can fill up to %d instances, require %d nodes", need, limit)))
}
2 changes: 1 addition & 1 deletion strategy/fill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ func TestFillPlan(t *testing.T) {
assert.EqualValues(t, 1, r["2"])

_, err = FillPlan(context.TODO(), nodes, 5, 1000, 0)
assert.EqualError(t, err, "not enough resource: insufficient nodes to fill 5, 1 more nodes needed")
assert.EqualError(t, err, "not enough resource: not enough nodes that can fill up to 5 instances, require 1 nodes")
}
4 changes: 2 additions & 2 deletions strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ const (
)

// Plans .
var Plans = map[string]startegyFunc{
var Plans = map[string]strategyFunc{
Auto: CommunismPlan,
Fill: FillPlan,
Each: AveragePlan,
Global: GlobalPlan,
}

type startegyFunc = func(_ context.Context, _ []Info, need, total, limit int) (map[string]int, error)
type strategyFunc = func(_ context.Context, _ []Info, need, total, limit int) (map[string]int, error)

// Deploy .
func Deploy(ctx context.Context, opts *types.DeployOptions, strategyInfos []Info, total int) (map[string]int, error) {
Expand Down

0 comments on commit c927988

Please sign in to comment.