Skip to content

Commit

Permalink
Merge pull request #287 from ffromani/scoring-strat-support
Browse files Browse the repository at this point in the history
sched: add support for setting the ScoringStrategy
  • Loading branch information
ffromani authored Feb 22, 2024
2 parents 1ee111c + eed8d05 commit 5b8c893
Show file tree
Hide file tree
Showing 8 changed files with 555 additions and 38 deletions.
5 changes: 3 additions & 2 deletions pkg/commands/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ func NewRenderSchedulerPluginCommand(env *deployer.Environment, commonOpts *opti
}

renderOpts := options.Scheduler{
Replicas: int32(commonOpts.Replicas),
PullIfNotPresent: commonOpts.PullIfNotPresent,
Replicas: int32(commonOpts.Replicas),
PullIfNotPresent: commonOpts.PullIfNotPresent,
ScoringStratConfigData: commonOpts.SchedScoringStratConfigData,
}
schedObjs, err := schedManifests.Render(env.Log, renderOpts)
if err != nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ const (
)

type internalOptions struct {
rteConfigFile string
plat string
rteConfigFile string
schedScoringStratConfigFile string
plat string
}

func ShowHelp(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -98,6 +99,7 @@ func NewRootCommand(extraCmds ...NewCommandFunc) *cobra.Command {
func InitFlags(flags *pflag.FlagSet, commonOpts *options.Options, internalOpts *internalOptions) {
flags.StringVarP(&internalOpts.plat, "platform", "P", "", "platform kind:version to deploy on (example kubernetes:v1.22)")
flags.StringVar(&internalOpts.rteConfigFile, "rte-config-file", "", "inject rte configuration reading from this file.")
flags.StringVar(&internalOpts.schedScoringStratConfigFile, "sched-scoring-strat-config-file", "", "inject scheduler scoring strategy configuration reading from this file.")

flags.IntVarP(&commonOpts.Replicas, "replicas", "R", 1, "set the replica value - where relevant.")
flags.DurationVarP(&commonOpts.WaitInterval, "wait-interval", "E", 2*time.Second, "wait interval.")
Expand Down Expand Up @@ -142,6 +144,14 @@ func PostSetupOptions(env *deployer.Environment, commonOpts *options.Options, in
commonOpts.RTEConfigData = string(data)
env.Log.Info("RTE config: read", "bytes", len(commonOpts.RTEConfigData))
}
if internalOpts.schedScoringStratConfigFile != "" {
data, err := os.ReadFile(internalOpts.schedScoringStratConfigFile)
if err != nil {
return err
}
commonOpts.SchedScoringStratConfigData = string(data)
env.Log.Info("Scheduler Scoring Strategy config: read", "bytes", len(commonOpts.SchedScoringStratConfigData))
}
return validateUpdaterType(commonOpts.UpdaterType)
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/manifests/sched/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
apiextensionv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

"github.com/k8stopologyawareschedwg/deployer/pkg/deployer/platform"
"github.com/k8stopologyawareschedwg/deployer/pkg/manifests"
Expand Down Expand Up @@ -95,7 +96,17 @@ func (mf Manifests) Render(logger logr.Logger, opts options.Scheduler) (Manifest
},
}

err := schedupdate.SchedulerConfig(ret.ConfigMap, opts.ProfileName, &params)
var err error

if len(opts.ScoringStratConfigData) > 0 {
params.ScoringStrategy = &manifests.ScoringStrategyParams{}
err = yaml.Unmarshal([]byte(opts.ScoringStratConfigData), params.ScoringStrategy)
if err != nil {
return ret, err
}
}

err = schedupdate.SchedulerConfig(ret.ConfigMap, opts.ProfileName, &params)
if err != nil {
return ret, err
}
Expand Down
88 changes: 85 additions & 3 deletions pkg/manifests/schedparams.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ const (
CacheInformerDedicated = "Dedicated"
)

const (
ScoringStrategyMostAllocated = "MostAllocated"
ScoringStrategyBalancedAllocation = "BalancedAllocation"
ScoringStrategyLeastAllocated = "LeastAllocated"
)

func ValidateForeignPodsDetectMode(value string) error {
switch value {
case ForeignPodsDetectNone:
Expand Down Expand Up @@ -91,9 +97,35 @@ type ConfigCacheParams struct {
InformerMode *string
}

type ResourceSpecParams struct {
// Name of the resource.
Name string `json:"name"`
// Weight of the resource.
Weight int64 `json:"weight,omitempty"`
}

type ScoringStrategyParams struct {
Type string `json:"type,omitempty"`
Resources []ResourceSpecParams `json:"resources,omitempty"`
}

func ValidateScoringStrategyType(value string) error {
switch value {
case ScoringStrategyMostAllocated:
return nil
case ScoringStrategyBalancedAllocation:
return nil
case ScoringStrategyLeastAllocated:
return nil
default:
return fmt.Errorf("unsupported scoringStrategyType: %v", value)
}
}

type ConfigParams struct {
ProfileName string // can't be empty, so no need for pointer
Cache *ConfigCacheParams
ProfileName string // can't be empty, so no need for pointer
Cache *ConfigCacheParams
ScoringStrategy *ScoringStrategyParams
}

func DecodeSchedulerProfilesFromData(data []byte) ([]ConfigParams, error) {
Expand Down Expand Up @@ -131,7 +163,7 @@ func DecodeSchedulerProfilesFromData(data []byte) ([]ConfigParams, error) {
for _, plConf := range pluginConfigs {
pluginConf, ok := plConf.(map[string]interface{})
if !ok {
klog.V(1).InfoS("unexpected profile coonfig data")
klog.V(1).InfoS("unexpected profile config data")
return params, nil
}

Expand Down Expand Up @@ -225,5 +257,55 @@ func extractParams(profileName string, args map[string]interface{}) (ConfigParam
params.Cache.InformerMode = &informerMode
}
}

scoringStratArgs, ok, err := unstructured.NestedMap(args, "scoringStrategy")
if err != nil {
return params, fmt.Errorf("cannot process field scoringStrategy: %w", err)
}
if ok {
params.ScoringStrategy = &ScoringStrategyParams{}

scoringType, cacheOk, err := unstructured.NestedString(scoringStratArgs, "type")
if err != nil {
return params, fmt.Errorf("cannot process field scoringStrategy.type: %w", err)
}
if cacheOk {
if err := ValidateScoringStrategyType(scoringType); err != nil {
return params, err
}
params.ScoringStrategy.Type = scoringType
}

scoringRess, cacheOk, err := unstructured.NestedSlice(scoringStratArgs, "resources")
if err != nil {
return params, fmt.Errorf("cannot process field scoringStrategy.resources: %w", err)
}
if cacheOk {
var resources []ResourceSpecParams
for idx, scRes := range scoringRess {
res, ok := scRes.(map[string]interface{})
if !ok {
return params, fmt.Errorf("unexpected scoringStrategy.resources[%d] data", idx)
}

name, ok, err := unstructured.NestedString(res, "name")
if !ok || err != nil {
return params, fmt.Errorf("unexpected scoringStrategy.resources[%d].name data (err=%v)", idx, err)
}

weight, ok, err := unstructured.NestedFloat64(res, "weight")
if !ok || err != nil {
return params, fmt.Errorf("unexpected scoringStrategy.resources[%d].weight data (err=%v)", idx, err)
}

resources = append(resources, ResourceSpecParams{
Name: name,
Weight: int64(weight),
})
}
params.ScoringStrategy.Resources = resources
}
}

return params, nil
}
Loading

0 comments on commit 5b8c893

Please sign in to comment.