diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 6b57e6745..80ddb1b39 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -29,6 +29,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOption StorageLimit: workload.StorageLimit + opts.ResourceOpts.StorageLimit, VolumeRequest: types.MergeVolumeBindings(workload.VolumeRequest, opts.ResourceOpts.VolumeRequest), VolumeLimit: types.MergeVolumeBindings(workload.VolumeLimit, opts.ResourceOpts.VolumeLimit), + VolumeExist: workload.VolumePlanRequest, }, ) if err != nil { @@ -57,8 +58,7 @@ func (c *Calcium) doReallocOnNode(ctx context.Context, nodename string, workload resourceMeta := &types.ResourceMeta{} for _, plan := range plans { if resourceMeta, err = plan.Dispense(resourcetypes.DispenseOptions{ - Node: node, - ExistingInstance: workload, + Node: node, }, resourceMeta); err != nil { return err } diff --git a/resources/types/types.go b/resources/types/types.go index 9c82babc9..1291b6029 100644 --- a/resources/types/types.go +++ b/resources/types/types.go @@ -63,8 +63,7 @@ type SchedulerV2 func(context.Context, []ScheduleInfo) (ResourcePlans, int, erro // DispenseOptions . type DispenseOptions struct { *types.Node - Index int - ExistingInstance *types.Workload + Index int } // ResourcePlans . diff --git a/resources/volume/volume.go b/resources/volume/volume.go index 32b555d45..4f26ce7ae 100644 --- a/resources/volume/volume.go +++ b/resources/volume/volume.go @@ -17,6 +17,7 @@ type volumeRequest struct { limit [maxVolumes]types.VolumeBinding requests int limits int + Existing types.VolumePlan } // MakeRequest . @@ -30,6 +31,7 @@ func MakeRequest(opts types.ResourceOptions) (resourcetypes.ResourceRequest, err } v.requests = len(opts.VolumeRequest) v.limits = len(opts.VolumeLimit) + v.Existing = opts.VolumeExist sort.Slice(opts.VolumeLimit, func(i, j int) bool { return opts.VolumeLimit[i].ToString(false) < opts.VolumeLimit[j].ToString(false) @@ -87,7 +89,12 @@ func (v volumeRequest) MakeScheduler() resourcetypes.SchedulerV2 { limit = append(limit, &v.limit[i]) } - scheduleInfos, volumePlans, total, err := schedulerV1.SelectVolumeNodes(ctx, scheduleInfos, request) + var volumePlans map[string][]types.VolumePlan + if v.Existing != nil { + scheduleInfos[0], volumePlans, total, err = schedulerV1.ReselectVolumeNodes(ctx, scheduleInfos[0], v.Existing, request) + } else { + scheduleInfos, volumePlans, total, err = schedulerV1.SelectVolumeNodes(ctx, scheduleInfos, request) + } return ResourcePlans{ capacity: resourcetypes.GetCapacity(scheduleInfos), request: request, @@ -172,22 +179,6 @@ func (rp ResourcePlans) Dispense(opts resourcetypes.DispenseOptions, r *types.Re } r.VolumePlanRequest = rp.plan[opts.Node.Name][opts.Index] - // if there are existing ones, ensure new volumes are compatible - if opts.ExistingInstance != nil { - found := false - for _, plan := range rp.plan[opts.Node.Name] { - if plan.Compatible(opts.ExistingInstance.VolumePlanRequest) { - r.VolumePlanRequest = plan - found = true - break - } - } - - if !found { - return nil, errors.Wrap(types.ErrInsufficientVolume, "incompatible volume plans") - } - } - // fix plans while limit > request r.VolumePlanLimit = types.VolumePlan{} for i := range rp.request { @@ -204,7 +195,8 @@ func (rp ResourcePlans) Dispense(opts resourcetypes.DispenseOptions, r *types.Re } // judge if volume changed - r.VolumeChanged = opts.ExistingInstance != nil && !r.VolumeLimit.IsEqual(opts.ExistingInstance.VolumeLimit) + // TODO@zc + r.VolumeChanged = false return r, nil } diff --git a/scheduler/complex/potassium.go b/scheduler/complex/potassium.go index b9fdc0db7..5299b846b 100644 --- a/scheduler/complex/potassium.go +++ b/scheduler/complex/potassium.go @@ -364,6 +364,10 @@ func (m *Potassium) ReselectVolumeNodes(ctx context.Context, scheduleInfo resour affinityPlan.Merge(unlimPlan) // schedule new volume requests + if len(needReschedule) == 0 { + scheduleInfo.Capacity = 1 + return scheduleInfo, map[string][]types.VolumePlan{scheduleInfo.Name: []types.VolumePlan{affinityPlan}}, 1, nil + } scheduleInfos, volumePlans, total, err := m.SelectVolumeNodes(ctx, []resourcetypes.ScheduleInfo{scheduleInfo}, needReschedule) if err != nil { return scheduleInfo, nil, 0, err diff --git a/types/resource.go b/types/resource.go index 8635b40c2..2354e68e6 100644 --- a/types/resource.go +++ b/types/resource.go @@ -19,6 +19,7 @@ type ResourceOptions struct { VolumeRequest VolumeBindings VolumeLimit VolumeBindings + VolumeExist VolumePlan StorageRequest int64 StorageLimit int64 @@ -238,3 +239,14 @@ func (p VolumePlan) Merge(p2 VolumePlan) { p[vb] = vm } } + +// FindAffinityPlan . +func (p VolumePlan) FindAffinityPlan(req VolumeBinding) (_ VolumeBinding, _ VolumeMap, found bool) { + for vb, vm := range p { + if vb.Source == req.Source && vb.Destination == req.Destination && vb.Flags == req.Flags { + return vb, vm, true + } + } + found = false + return +}