Skip to content

Commit

Permalink
Introduce LocalSSDSizeProvider interface for GCE
Browse files Browse the repository at this point in the history
  • Loading branch information
atwamahmoud committed Feb 16, 2024
1 parent 5286b3f commit e7ff1cd
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
cloudBuilder "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/builder"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/externalgrpc/examples/external-grpc-cloud-provider-service/wrapper"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/externalgrpc/protos"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/config"
kube_flag "k8s.io/component-base/cli/flag"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -121,7 +122,8 @@ func main() {
NodeGroups: *nodeGroupsFlag,
ClusterName: *clusterName,
GCEOptions: config.GCEOptions{
ConcurrentRefreshes: 1,
ConcurrentRefreshes: 1,
LocalSSDDiskSizeProvider: localssdsize.NewSimpleLocalSSDProvider(),
},
UserAgent: "user-agent",
}
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,12 @@ func BuildGCE(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
defer config.Close()
}

manager, err := CreateGceManager(config, do, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
if err != nil {
klog.Fatalf("Failed to create GCE Manager: %v", err)
}

pricingModel := NewGcePriceModel(NewGcePriceInfo())
pricingModel := NewGcePriceModel(NewGcePriceInfo(), opts.GCEOptions.LocalSSDDiskSizeProvider)
provider, err := BuildGceCloudProvider(manager, rl, pricingModel)
if err != nil {
klog.Fatalf("Failed to create GCE cloud provider: %v", err)
Expand Down
53 changes: 29 additions & 24 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -112,19 +113,22 @@ type gceManagerImpl struct {
migInfoProvider MigInfoProvider
migLister MigLister

location string
projectId string
domainUrl string
templates *GceTemplateBuilder
interrupt chan struct{}
regional bool
explicitlyConfigured map[GceRef]bool
migAutoDiscoverySpecs []migAutoDiscoveryConfig
reserved *GceReserved
location string
projectId string
domainUrl string
templates *GceTemplateBuilder
interrupt chan struct{}
regional bool
explicitlyConfigured map[GceRef]bool
migAutoDiscoverySpecs []migAutoDiscoveryConfig
reserved *GceReserved
localSSDDiskSizeProvider localssdsize.LocalSSDSizeProvider
}

// CreateGceManager constructs GceManager object.
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, regional bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions,
localSSDDiskSizeProvider localssdsize.LocalSSDSizeProvider,
regional bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
// Create Google Compute Engine token.
var err error
tokenSource := google.ComputeTokenSource("")
Expand Down Expand Up @@ -181,19 +185,20 @@ func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGr
cache := NewGceCache()
migLister := NewMigLister(cache)
manager := &gceManagerImpl{
cache: cache,
GceService: gceService,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime),
location: location,
regional: regional,
projectId: projectId,
templates: &GceTemplateBuilder{},
interrupt: make(chan struct{}),
explicitlyConfigured: make(map[GceRef]bool),
concurrentGceRefreshes: concurrentGceRefreshes,
reserved: &GceReserved{},
domainUrl: domainUrl,
cache: cache,
GceService: gceService,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime),
location: location,
regional: regional,
projectId: projectId,
templates: &GceTemplateBuilder{},
interrupt: make(chan struct{}),
explicitlyConfigured: make(map[GceRef]bool),
concurrentGceRefreshes: concurrentGceRefreshes,
reserved: &GceReserved{},
domainUrl: domainUrl,
localSSDDiskSizeProvider: localSSDDiskSizeProvider,
}

if err := manager.fetchExplicitMigs(discoveryOpts.NodeGroupSpecs); err != nil {
Expand Down Expand Up @@ -599,7 +604,7 @@ func (m *gceManagerImpl) GetMigTemplateNode(mig Mig) (*apiv1.Node, error) {
if err != nil {
return nil, err
}
return m.templates.BuildNodeFromTemplate(mig, migOsInfo, template, machineType.CPU, machineType.Memory, nil, m.reserved)
return m.templates.BuildNodeFromTemplate(mig, migOsInfo, template, machineType.CPU, machineType.Memory, nil, m.reserved, m.localSSDDiskSizeProvider)
}

// parseMIGAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs
Expand Down
44 changes: 23 additions & 21 deletions cluster-autoscaler/cloudprovider/gce/gce_price_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"

Expand All @@ -30,13 +31,15 @@ import (

// GcePriceModel implements PriceModel interface for GCE.
type GcePriceModel struct {
PriceInfo PriceInfo
PriceInfo PriceInfo
localSSDSizeProvider localssdsize.LocalSSDSizeProvider
}

// NewGcePriceModel gets a new instance of GcePriceModel
func NewGcePriceModel(info PriceInfo) *GcePriceModel {
func NewGcePriceModel(info PriceInfo, localSSDSizeProvider localssdsize.LocalSSDSizeProvider) *GcePriceModel {
return &GcePriceModel{
PriceInfo: info,
PriceInfo: info,
localSSDSizeProvider: localSSDSizeProvider,
}
}

Expand All @@ -55,27 +58,26 @@ const DefaultBootDiskSizeGB = 100
func (model *GcePriceModel) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) {
price := 0.0
basePriceFound := false

// Base instance price
machineType := ""
if node.Labels != nil {
if machineType, found := getInstanceTypeFromLabels(node.Labels); found {
priceMapToUse := model.PriceInfo.InstancePrices()
if hasPreemptiblePricing(node) {
priceMapToUse = model.PriceInfo.PreemptibleInstancePrices()
}
if basePricePerHour, found := priceMapToUse[machineType]; found {
price = basePricePerHour * getHours(startTime, endTime)
basePriceFound = true
} else {
klog.Warningf("Pricing information not found for instance type %v; will fallback to default pricing", machineType)
}
if _machineType, found := getInstanceTypeFromLabels(node.Labels); found {
machineType = _machineType
}
}
// Base instance price
priceMapToUse := model.PriceInfo.InstancePrices()
if hasPreemptiblePricing(node) {
priceMapToUse = model.PriceInfo.PreemptibleInstancePrices()
}
if basePricePerHour, found := priceMapToUse[machineType]; found {
price = basePricePerHour * getHours(startTime, endTime)
basePriceFound = true
} else {
klog.Warningf("Pricing information not found for instance type %v; will fallback to default pricing", machineType)
}
if !basePriceFound {
if machineType, found := getInstanceTypeFromLabels(node.Labels); found {
price = model.getBasePrice(node.Status.Capacity, machineType, startTime, endTime)
price = price * model.getPreemptibleDiscount(node)
}
price = model.getBasePrice(node.Status.Capacity, machineType, startTime, endTime)
price = price * model.getPreemptibleDiscount(node)
}

// Ephemeral Storage
Expand All @@ -86,7 +88,7 @@ func (model *GcePriceModel) NodePrice(node *apiv1.Node, startTime time.Time, end
if hasPreemptiblePricing(node) {
localSsdPrice = model.PriceInfo.SpotLocalSsdPricePerHour()
}
price += localSsdCount * float64(LocalSSDDiskSizeInGiB) * localSsdPrice * getHours(startTime, endTime)
price += localSsdCount * float64(model.localSSDSizeProvider.SSDSizeInGiB(machineType)) * localSsdPrice * getHours(startTime, endTime)
}

// Boot disk price
Expand Down
10 changes: 7 additions & 3 deletions cluster-autoscaler/cloudprovider/gce/gce_price_model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
Expand Down Expand Up @@ -63,7 +64,10 @@ func testNode(t *testing.T, nodeName string, instanceType string, millicpu int64
func testNodeEphemeralStorage(t *testing.T, nodeName string, isEphemeralStorageLocalSsd bool, localSsdCount int, bootDiskType string, bootDiskSize int, isSpot bool) *apiv1.Node {
node := testNode(t, nodeName, "", 8000, 30*units.GiB, "", 0, false, isSpot)
if isEphemeralStorageLocalSsd {
AddEphemeralStorageToNode(node, int64(localSsdCount)*LocalSSDDiskSizeInGiB)
simpleLocalSSDProvider := localssdsize.NewSimpleLocalSSDProvider()
machineType := ""
ssdSize := simpleLocalSSDProvider.SSDSizeInGiB(machineType)
AddEphemeralStorageToNode(node, int64(localSsdCount)*int64(ssdSize))
} else {
AddEphemeralStorageToNode(node, int64(bootDiskSize))
}
Expand Down Expand Up @@ -218,7 +222,7 @@ func TestGetNodePrice(t *testing.T) {

for tn, tc := range cases {
t.Run(tn, func(t *testing.T) {
model := NewGcePriceModel(NewGcePriceInfo())
model := NewGcePriceModel(NewGcePriceInfo(), localssdsize.NewSimpleLocalSSDProvider())
now := time.Now()

price1, err := model.NodePrice(tc.cheaperNode, now, now.Add(time.Hour))
Expand All @@ -237,7 +241,7 @@ func TestGetPodPrice(t *testing.T) {
pod2 := BuildTestPodWithEphemeralStorage("a2", 2*100, 2*500*units.MiB, 2*100*units.GiB)
pod3 := BuildTestPodWithEphemeralStorage("a2", 2*100, 2*500*units.MiB, 100*units.GiB)

model := NewGcePriceModel(NewGcePriceInfo())
model := NewGcePriceModel(NewGcePriceInfo(), localssdsize.NewSimpleLocalSSDProvider())
now := time.Now()

price1, err := model.PodPrice(pod1, now, now.Add(time.Hour))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package localssdsize

// LocalSSDSizeProvider contains methods to calculate local ssd disk size for GCE based on some input parameters (e.g. machine type name)
type LocalSSDSizeProvider interface {
// Computes local ssd disk size in GiB based on machine type name
SSDSizeInGiB(string) uint64
}

// LocalSSDDiskSizeInGiB is the size of each local SSD in GiB
// (cf. https://cloud.google.com/compute/docs/disks/local-ssd)
const LocalSSDDiskSizeInGiB = uint64(375)

// SimpleLocalSSDProvider implements LocalSSDSizeProvider
// It always returns a constant size
type SimpleLocalSSDProvider struct {
ssdDiskSize uint64
}

// NewSimpleLocalSSDProvider creates an instance of SimpleLocalSSDProvider with `LocalSSDDiskSizeInGiB` as the disk size and returns a pointer to it
func NewSimpleLocalSSDProvider() LocalSSDSizeProvider {
return &SimpleLocalSSDProvider{
ssdDiskSize: LocalSSDDiskSizeInGiB,
}
}

// SSDSizeInGiB Returns a constant disk size in GiB
// First parameter is not used and added to conform to the interface `LocalSSDSizeProvider`
func (lsp *SimpleLocalSSDProvider) SSDSizeInGiB(_ string) uint64 {
return lsp.ssdDiskSize
}
14 changes: 6 additions & 8 deletions cluster-autoscaler/cloudprovider/gce/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@ import (
"sigs.k8s.io/yaml"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
)

// GceTemplateBuilder builds templates for GCE nodes.
type GceTemplateBuilder struct{}

// LocalSSDDiskSizeInGiB is the size of each local SSD in GiB
// (cf. https://cloud.google.com/compute/docs/disks/local-ssd)
const LocalSSDDiskSizeInGiB = 375

// These annotations are used internally only to store information in node temlate and use it later in CA, the actuall nodes won't have these annotations.
const (
// LocalSsdCountAnnotation is the annotation for number of attached local SSDs to the node.
Expand Down Expand Up @@ -186,7 +183,7 @@ func (t *GceTemplateBuilder) MigOsInfo(migId string, template *gce.InstanceTempl
}

// BuildNodeFromTemplate builds node from provided GCE template.
func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, migOsInfo MigOsInfo, template *gce.InstanceTemplate, cpu int64, mem int64, pods *int64, reserved OsReservedCalculator) (*apiv1.Node, error) {
func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, migOsInfo MigOsInfo, template *gce.InstanceTemplate, cpu int64, mem int64, pods *int64, reserved OsReservedCalculator, localSSDSizeProvider localssdsize.LocalSSDSizeProvider) (*apiv1.Node, error) {

if template.Properties == nil {
return nil, fmt.Errorf("instance template %s has no properties", template.Name)
Expand Down Expand Up @@ -222,7 +219,8 @@ func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, migOsInfo MigOsInfo,
}
ephemeralStorageLocalSsdCount := ephemeralStorageLocalSSDCount(kubeEnvValue)
if err == nil && ephemeralStorageLocalSsdCount > 0 {
ephemeralStorage, err = getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount)
localSSDDiskSize := localSSDSizeProvider.SSDSizeInGiB(template.Properties.MachineType)
ephemeralStorage, err = getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount, int64(localSSDDiskSize))
}
if err != nil {
return nil, fmt.Errorf("could not fetch ephemeral storage from instance template: %v", err)
Expand Down Expand Up @@ -324,11 +322,11 @@ func getLocalSsdCount(instanceProperties *gce.InstanceProperties) (int64, error)
return count, nil
}

func getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount int64) (int64, error) {
func getEphemeralStorageOnLocalSsd(localSsdCount, ephemeralStorageLocalSsdCount, localSSDDiskSizeInGiB int64) (int64, error) {
if localSsdCount < ephemeralStorageLocalSsdCount {
return 0, fmt.Errorf("actual local SSD count is lower than ephemeral_storage_local_ssd_count")
}
return ephemeralStorageLocalSsdCount * LocalSSDDiskSizeInGiB * units.GiB, nil
return ephemeralStorageLocalSsdCount * localSSDDiskSizeInGiB * units.GiB, nil
}

// isBootDiskEphemeralStorageWithInstanceTemplateDisabled will allow bypassing Disk Size of Boot Disk from being
Expand Down
9 changes: 6 additions & 3 deletions cluster-autoscaler/cloudprovider/gce/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"testing"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
"k8s.io/autoscaler/cluster-autoscaler/config"
gpuUtils "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
Expand Down Expand Up @@ -267,7 +268,8 @@ func TestBuildNodeFromTemplateSetsResources(t *testing.T) {
assert.Error(t, err)
return
}
node, err := tb.BuildNodeFromTemplate(mig, migOsInfo, template, tc.physicalCpu, tc.physicalMemory, tc.pods, &GceReserved{})
localSSDDiskSize := localssdsize.NewSimpleLocalSSDProvider()
node, err := tb.BuildNodeFromTemplate(mig, migOsInfo, template, tc.physicalCpu, tc.physicalMemory, tc.pods, &GceReserved{}, localSSDDiskSize)
if tc.expectedNodeTemplateErr {
assert.Error(t, err)
} else {
Expand All @@ -293,7 +295,7 @@ func TestBuildNodeFromTemplateSetsResources(t *testing.T) {
// specifying physicalEphemeralStorageGiB in the testCase struct
physicalEphemeralStorageGiB := tc.bootDiskSizeGiB
if tc.ephemeralStorageLocalSSDCount > 0 {
physicalEphemeralStorageGiB = tc.ephemeralStorageLocalSSDCount * LocalSSDDiskSizeInGiB
physicalEphemeralStorageGiB = tc.ephemeralStorageLocalSSDCount * int64(localSSDDiskSize.SSDSizeInGiB(template.Properties.MachineType))
} else if tc.isEphemeralStorageBlocked {
physicalEphemeralStorageGiB = 0
}
Expand Down Expand Up @@ -1414,7 +1416,8 @@ func TestBuildNodeFromTemplateArch(t *testing.T) {
if gotErr != nil {
t.Fatalf("MigOsInfo unexpected error: %v", gotErr)
}
gotNode, gotErr := tb.BuildNodeFromTemplate(mig, migOsInfo, template, 16, 128, nil, &GceReserved{})
localSSDDiskSize := localssdsize.NewSimpleLocalSSDProvider()
gotNode, gotErr := tb.BuildNodeFromTemplate(mig, migOsInfo, template, 16, 128, nil, &GceReserved{}, localSSDDiskSize)
if gotErr != nil {
t.Fatalf("BuildNodeFromTemplate unexpected error: %v", gotErr)
}
Expand Down
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config
import (
"time"

gce_localssdsize "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/gce/localssdsize"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config"
)
Expand Down Expand Up @@ -63,6 +64,8 @@ type GCEOptions struct {
MigInstancesMinRefreshWaitTime time.Duration
// DomainUrl is the GCE url used to make calls to GCE API.
DomainUrl string
// LocalSSDDiskSizeProvider provides local ssd disk size based on machine type
LocalSSDDiskSizeProvider gce_localssdsize.LocalSSDSizeProvider
}

const (
Expand Down
Loading

0 comments on commit e7ff1cd

Please sign in to comment.