diff --git a/cluster-autoscaler/README.md b/cluster-autoscaler/README.md index 77d2a797017e..efc8106f6a95 100644 --- a/cluster-autoscaler/README.md +++ b/cluster-autoscaler/README.md @@ -32,6 +32,7 @@ You should also take a look at the notes and "gotchas" for your specific cloud p * [BizflyCloud](./cloudprovider/bizflycloud/README.md) * [Vultr](./cloudprovider/vultr/README.md) * [TencentCloud](./cloudprovider/tencentcloud/README.md) +* [Scaleway](./cloudprovider/scaleway/README.md) # Releases diff --git a/cluster-autoscaler/cloudprovider/builder/builder_all.go b/cluster-autoscaler/cloudprovider/builder/builder_all.go index 0405557dd330..b953ecb117f7 100644 --- a/cluster-autoscaler/cloudprovider/builder/builder_all.go +++ b/cluster-autoscaler/cloudprovider/builder/builder_all.go @@ -1,5 +1,5 @@ -//go:build !gce && !aws && !azure && !kubemark && !alicloud && !magnum && !digitalocean && !clusterapi && !huaweicloud && !ionoscloud && !linode && !hetzner && !bizflycloud && !brightbox && !packet && !oci && !vultr && !tencentcloud && !externalgrpc && !civo -// +build !gce,!aws,!azure,!kubemark,!alicloud,!magnum,!digitalocean,!clusterapi,!huaweicloud,!ionoscloud,!linode,!hetzner,!bizflycloud,!brightbox,!packet,!oci,!vultr,!tencentcloud,!externalgrpc,!civo +//go:build !gce && !aws && !azure && !kubemark && !alicloud && !magnum && !digitalocean && !clusterapi && !huaweicloud && !ionoscloud && !linode && !hetzner && !bizflycloud && !brightbox && !packet && !oci && !vultr && !tencentcloud && !scaleway && !externalgrpc && !civo +// +build !gce,!aws,!azure,!kubemark,!alicloud,!magnum,!digitalocean,!clusterapi,!huaweicloud,!ionoscloud,!linode,!hetzner,!bizflycloud,!brightbox,!packet,!oci,!vultr,!tencentcloud,!scaleway,!externalgrpc,!civo /* Copyright 2018 The Kubernetes Authors. @@ -43,6 +43,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/ovhcloud" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/packet" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/tencentcloud" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/vultr" "k8s.io/autoscaler/cluster-autoscaler/config" @@ -74,6 +75,7 @@ var AvailableCloudProviders = []string{ cloudprovider.VultrProviderName, cloudprovider.TencentcloudProviderName, cloudprovider.CivoProviderName, + cloudprovider.ScalewayProviderName, } // DefaultCloudProvider is GCE. @@ -129,6 +131,8 @@ func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGro return tencentcloud.BuildTencentcloud(opts, do, rl) case cloudprovider.CivoProviderName: return civo.BuildCivo(opts, do, rl) + case cloudprovider.ScalewayProviderName: + return scaleway.BuildScaleway(opts, do, rl) } return nil } diff --git a/cluster-autoscaler/cloudprovider/builder/builder_scaleway.go b/cluster-autoscaler/cloudprovider/builder/builder_scaleway.go new file mode 100644 index 000000000000..558fb3e0af76 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/builder/builder_scaleway.go @@ -0,0 +1,43 @@ +//go:build scaleway +// +build scaleway + +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway" + "k8s.io/autoscaler/cluster-autoscaler/config" +) + +// AvailableCloudProviders supported by the scaleway cloud provider builder. +var AvailableCloudProviders = []string{ + cloudprovider.ScalewayProviderName, +} + +// DefaultCloudProvider for do-only build is Scaleway. +const DefaultCloudProvider = cloudprovider.ScalewayProviderName + +func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { + switch opts.CloudProviderName { + case cloudprovider.ScalewayProviderName: + return scaleway.BuildScaleway(opts, do, rl) + } + + return nil +} diff --git a/cluster-autoscaler/cloudprovider/cloud_provider.go b/cluster-autoscaler/cloudprovider/cloud_provider.go index b35a41a859ae..232702d48558 100644 --- a/cluster-autoscaler/cloudprovider/cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/cloud_provider.go @@ -68,6 +68,8 @@ const ( OVHcloudProviderName = "ovhcloud" // LinodeProviderName gets the provider name of linode LinodeProviderName = "linode" + // ScalewayProviderName gets the provider name of scaleway + ScalewayProviderName = "scaleway" // VultrProviderName gets the provider name of vultr VultrProviderName = "vultr" // PacketProviderName gets the provider name of packet diff --git a/cluster-autoscaler/cloudprovider/scaleway/OWNERS b/cluster-autoscaler/cloudprovider/scaleway/OWNERS new file mode 100644 index 000000000000..0765b7c70703 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/OWNERS @@ -0,0 +1,6 @@ +#approvers: +#- louisportay +#- jtherin +reviewers: +- remyleone +- Sh4d1 diff --git a/cluster-autoscaler/cloudprovider/scaleway/README.md b/cluster-autoscaler/cloudprovider/scaleway/README.md new file mode 100644 index 000000000000..5d446ef6910a --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/README.md @@ -0,0 +1,29 @@ +# Cluster Autoscaler for Scaleway + +The Scaleway Cloud Provider implementation scales nodes on different pools +attached to a Kapsule cluster. It can be configured from Scaleway Kapsule API. +The cluster pools need to have the option `Autoscaling` set to true to be managed by the autoscaler. + +## Configuration + +Cluster Autoscaler can be configured with 2 options +### Config file +a config file can be passed with the `--cloud-config` flag. +here is the corresponding JSON schema: +* `cluster_id`: Kapsule Cluster Id +* `secret_key`: Secret Key used to manage associated Kapsule resources +* `region`: Region where the control-plane is runnning +* `api_url`: URL to contact Scaleway, defaults to `api.scaleway.com` + +### Env variables + +The values expected by the autoscaler are the same as above + +- `CLUSTER_ID` +- `SCW_SECRET_KEY` +- `SCW_REGION` +- `SCW_API_URL` + +## Notes + +k8s nodes are identified through `node.Spec.ProviderId`, the scaleway node name or id MUST NOT be used. diff --git a/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go b/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go new file mode 100644 index 000000000000..9d1608c57121 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go @@ -0,0 +1,266 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaleway + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math" + "os" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo" + "k8s.io/autoscaler/cluster-autoscaler/config" + ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/klog/v2" +) + +const ( + // GPULabel is the label added to GPU nodes + GPULabel = "k8s.scaleway.com/gpu" +) + +type scalewayCloudProvider struct { + // client talks to Kapsule API + client scalewaygo.Client + // ClusterID is the cluster id where the Autoscaler is running. + clusterID string + // nodeGroups is an abstraction around the Pool object returned by the API + nodeGroups []*NodeGroup + + resourceLimiter *cloudprovider.ResourceLimiter +} + +func readConf(config *scalewaygo.Config, configFile io.Reader) error { + body, err := ioutil.ReadAll(configFile) + if err != nil { + return err + } + err = json.Unmarshal(body, config) + return err +} + +func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl *cloudprovider.ResourceLimiter) *scalewayCloudProvider { + getenvOr := func(key, defaultValue string) string { + value := os.Getenv(key) + if value != "" { + return value + } + return defaultValue + } + + // Config file passed with `cloud-config` flag + cfg := scalewaygo.Config{} + if configFile != nil { + err := readConf(&cfg, configFile) + if err != nil { + klog.Errorf("failed to read/parse scaleway config file: %s", err) + } + } + + // env takes precedence over config passed by command-line + cfg.ClusterID = getenvOr("CLUSTER_ID", cfg.ClusterID) + cfg.SecretKey = getenvOr("SCW_SECRET_KEY", cfg.SecretKey) + cfg.Region = getenvOr("SCW_REGION", cfg.Region) + cfg.ApiUrl = getenvOr("SCW_API_URL", cfg.ApiUrl) + + cfg.UserAgent = defaultUserAgent + + client, err := scalewaygo.NewClient(cfg) + if err != nil { + klog.Fatalf("failed to create scaleway cloud provider: %v", err) + } + + klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,SecretKey=%s-***,Region=%s,ApiURL=%s", cfg.ClusterID, client.Token()[:8], client.Region(), client.ApiURL()) + + return &scalewayCloudProvider{ + client: client, + clusterID: cfg.ClusterID, + resourceLimiter: rl, + } +} + +// BuildScaleway returns CloudProvider implementation for Scaleway. +func BuildScaleway( + opts config.AutoscalingOptions, + do cloudprovider.NodeGroupDiscoveryOptions, + rl *cloudprovider.ResourceLimiter, +) cloudprovider.CloudProvider { + var configFile io.Reader + + if opts.CloudConfig != "" { + configFile, err := os.Open(opts.CloudConfig) + + if err != nil { + klog.Errorf("could not open scaleway configuration %s: %s", opts.CloudConfig, err) + } else { + defer func() { + err = configFile.Close() + if err != nil { + klog.Errorf("failed to close scaleway config file: %s", err) + } + }() + } + } + return newScalewayCloudProvider(configFile, opts.UserAgent, rl) +} + +// Name returns 'scaleway' +func (*scalewayCloudProvider) Name() string { + return cloudprovider.ScalewayProviderName +} + +// NodeGroups returns all node groups configured for this cluster. +// critical endpoint, make it fast +func (scw *scalewayCloudProvider) NodeGroups() []cloudprovider.NodeGroup { + + klog.V(4).Info("NodeGroups,ClusterID=", scw.clusterID) + + nodeGroups := make([]cloudprovider.NodeGroup, len(scw.nodeGroups)) + for i, ng := range scw.nodeGroups { + nodeGroups[i] = ng + } + return nodeGroups +} + +func (scw *scalewayCloudProvider) nodeGroupForNode(node *apiv1.Node) (*NodeGroup, error) { + for _, ng := range scw.nodeGroups { + if _, ok := ng.nodes[node.Spec.ProviderID]; ok { + return ng, nil + } + } + return nil, nil +} + +// NodeGroupForNode returns the node group for the given node, nil if the node +// should not be processed by cluster autoscaler, or non-nil error if such +// occurred. +// critical endpoint, make it fast +func (scw *scalewayCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) { + klog.V(4).Infof("NodeGroupForNode,NodeSpecProviderID=%s", node.Spec.ProviderID) + + return scw.nodeGroupForNode(node) +} + +func (scw *scalewayCloudProvider) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) { + ng, err := scw.nodeGroupForNode(node) + if err != nil { + return 0.0, err + } + + d := endTime.Sub(startTime) + hours := math.Ceil(d.Hours()) + + return hours * float64(ng.specs.NodePricePerHour), nil +} + +func (scw *scalewayCloudProvider) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) { + return 0.0, nil +} + +// Pricing return pricing model for scaleway. +func (scw *scalewayCloudProvider) Pricing() (cloudprovider.PricingModel, ca_errors.AutoscalerError) { + klog.V(4).Info("Pricing,called") + return scw, nil +} + +// GetAvailableMachineTypes get all machine types that can be requested from scaleway. +// Not implemented +func (scw *scalewayCloudProvider) GetAvailableMachineTypes() ([]string, error) { + return []string{}, nil +} + +func (scw *scalewayCloudProvider) NewNodeGroup( + machineType string, + labels map[string]string, + systemLabels map[string]string, + taints []apiv1.Taint, + extraResources map[string]resource.Quantity, +) (cloudprovider.NodeGroup, error) { + klog.V(4).Info("NewNodeGroup,called") + return nil, cloudprovider.ErrNotImplemented +} + +// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.). +func (scw *scalewayCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) { + klog.V(4).Info("GetResourceLimiter,called") + return scw.resourceLimiter, nil +} + +// GPULabel returns the label added to nodes with GPU resource. +func (scw *scalewayCloudProvider) GPULabel() string { + klog.V(6).Info("GPULabel,called") + return GPULabel +} + +// GetAvailableGPUTypes return all available GPU types cloud provider supports. +// not yet implemented. +func (scw *scalewayCloudProvider) GetAvailableGPUTypes() map[string]struct{} { + klog.V(4).Info("GetAvailableGPUTypes,called") + return nil +} + +// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc. +func (scw *scalewayCloudProvider) Cleanup() error { + klog.V(4).Info("Cleanup,called") + return nil +} + +// Refresh is called before every main loop and can be used to dynamically update cloud provider state. +// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). +func (scw *scalewayCloudProvider) Refresh() error { + klog.V(4).Info("Refresh,ClusterID=", scw.clusterID) + + ctx := context.Background() + resp, err := scw.client.ListPools(ctx, &scalewaygo.ListPoolsRequest{ClusterID: scw.clusterID}) + + if err != nil { + klog.Errorf("Refresh,failed to list pools for cluster %s: %s", scw.clusterID, err) + return err + } + + var ng []*NodeGroup + + for _, p := range resp.Pools { + + if p.Pool.Autoscaling == false { + continue + } + + nodes, err := nodesFromPool(scw.client, p.Pool) + if err != nil { + return fmt.Errorf("Refresh,failed to list nodes for pool %s: %w", p.Pool.ID, err) + } + ng = append(ng, &NodeGroup{ + Client: scw.client, + nodes: nodes, + specs: &p.Specs, + p: p.Pool, + }) + } + klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(ng)) + + scw.nodeGroups = ng + + return nil +} diff --git a/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group.go b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group.go new file mode 100644 index 000000000000..c70d12ef5c0f --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group.go @@ -0,0 +1,356 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaleway + +import ( + "context" + "errors" + "fmt" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" + "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + "strings" +) + +// NodeGroup implements cloudprovider.NodeGroup interface. +// it is used to resize a Scaleway Pool which is a group of nodes with the same capacity. +type NodeGroup struct { + scalewaygo.Client + + nodes map[string]*scalewaygo.Node + specs *scalewaygo.GenericNodeSpecs + p *scalewaygo.Pool +} + +// MaxSize returns maximum size of the node group. +func (ng *NodeGroup) MaxSize() int { + klog.V(6).Info("MaxSize,called") + + return int(ng.p.MaxSize) +} + +// MinSize returns minimum size of the node group. +func (ng *NodeGroup) MinSize() int { + klog.V(6).Info("MinSize,called") + + return int(ng.p.MinSize) +} + +// TargetSize returns the current target size of the node group. It is possible that the +// number of nodes in Kubernetes is different at the moment but should be equal +// to Size() once everything stabilizes (new nodes finish startup and registration or +// removed nodes are deleted completely). +func (ng *NodeGroup) TargetSize() (int, error) { + klog.V(6).Info("TargetSize,called") + return int(ng.p.Size), nil +} + +// IncreaseSize increases the size of the node group. To delete a node you need +// to explicitly name it and use DeleteNode. This function should wait until +// node group size is updated. +func (ng *NodeGroup) IncreaseSize(delta int) error { + + klog.V(4).Infof("IncreaseSize,ClusterID=%s,delta=%d", ng.p.ClusterID, delta) + + if delta <= 0 { + return fmt.Errorf("delta must be strictly positive, have: %d", delta) + } + + targetSize := ng.p.Size + uint32(delta) + + if targetSize > uint32(ng.MaxSize()) { + return fmt.Errorf("size increase is too large. current: %d desired: %d max: %d", + ng.p.Size, targetSize, ng.MaxSize()) + } + + ctx := context.Background() + pool, err := ng.UpdatePool(ctx, &scalewaygo.UpdatePoolRequest{ + PoolID: ng.p.ID, + Size: &targetSize, + }) + if err != nil { + return err + } + + if pool.Size != targetSize { + return fmt.Errorf("couldn't increase size to %d. Current size is: %d", + targetSize, pool.Size) + } + + ng.p.Size = targetSize + return nil +} + +// DeleteNodes deletes nodes from this node group. Error is returned either on +// failure or if the given node doesn't belong to this node group. This function +// should wait until node group size is updated. +func (ng *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error { + ctx := context.Background() + klog.V(4).Info("DeleteNodes,", len(nodes), " nodes to reclaim") + for _, n := range nodes { + + node, ok := ng.nodes[n.Spec.ProviderID] + if !ok { + klog.Errorf("DeleteNodes,ProviderID=%s,PoolID=%s,node marked for deletion not found in pool", n.Spec.ProviderID, ng.p.ID) + continue + } + + updatedNode, err := ng.DeleteNode(ctx, &scalewaygo.DeleteNodeRequest{ + NodeID: node.ID, + }) + if err != nil || updatedNode.Status != scalewaygo.NodeStatusDeleting { + return err + } + + ng.p.Size-- + ng.nodes[n.Spec.ProviderID].Status = scalewaygo.NodeStatusDeleting + } + + return nil +} + +// DecreaseTargetSize decreases the target size of the node group. This function +// doesn't permit to delete any existing node and can be used only to reduce the +// request for new nodes that have not been yet fulfilled. Delta should be negative. +// It is assumed that cloud provider will not delete the existing nodes when there +// is an option to just decrease the target. +func (ng *NodeGroup) DecreaseTargetSize(delta int) error { + + klog.V(4).Infof("DecreaseTargetSize,ClusterID=%s,delta=%d", ng.p.ClusterID, delta) + + if delta >= 0 { + return fmt.Errorf("delta must be strictly negative, have: %d", delta) + } + + targetSize := ng.p.Size + uint32(delta) + if int(targetSize) < ng.MinSize() { + return fmt.Errorf("size decrease is too large. current: %d desired: %d min: %d", + ng.p.Size, targetSize, ng.MinSize()) + } + + ctx := context.Background() + pool, err := ng.UpdatePool(ctx, &scalewaygo.UpdatePoolRequest{ + PoolID: ng.p.ID, + Size: &targetSize, + }) + if err != nil { + return err + } + + if pool.Size != targetSize { + return fmt.Errorf("couldn't decrease size to %d. Current size is: %d", + targetSize, pool.Size) + } + + ng.p.Size = targetSize + return nil +} + +// Id returns an unique identifier of the node group. +func (ng *NodeGroup) Id() string { + return ng.p.ID +} + +// Debug returns a string containing all information regarding this node group. +func (ng *NodeGroup) Debug() string { + klog.V(4).Info("Debug,called") + return fmt.Sprintf("id:%s,status:%s,version:%s,autoscaling:%t,size:%d,min_size:%d,max_size:%d", ng.Id(), ng.p.Status, ng.p.Version, ng.p.Autoscaling, ng.p.Size, ng.MinSize(), ng.MaxSize()) +} + +// Nodes returns a list of all nodes that belong to this node group. +func (ng *NodeGroup) Nodes() ([]cloudprovider.Instance, error) { + var nodes []cloudprovider.Instance + + klog.V(4).Info("Nodes,PoolID=", ng.p.ID) + + for _, node := range ng.nodes { + nodes = append(nodes, cloudprovider.Instance{ + Id: node.ProviderID, + Status: fromScwStatus(node.Status), + }) + } + + return nodes, nil +} + +// TemplateNodeInfo returns a schedulerframework.NodeInfo structure of an empty +// (as if just started) node. This will be used in scale-up simulations to +// predict what would a new node look like if a node group was expanded. The returned +// NodeInfo is expected to have a fully populated Node object, with all of the labels, +// capacity and allocatable information as well as all pods that are started on +// the node by default, using manifest (most likely only kube-proxy). +func (ng *NodeGroup) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { + klog.V(4).Infof("TemplateNodeInfo,PoolID=%s", ng.p.ID) + node := apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: ng.specs.Labels[apiv1.LabelHostname], + Labels: ng.specs.Labels, + }, + Status: apiv1.NodeStatus{ + Capacity: apiv1.ResourceList{}, + Allocatable: apiv1.ResourceList{}, + }, + } + node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(int64(ng.specs.CpuCapacity), resource.DecimalSI) + node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(int64(ng.specs.MemoryCapacity), resource.DecimalSI) + node.Status.Capacity[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(int64(ng.specs.LocalStorageCapacity), resource.DecimalSI) + node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(int64(ng.specs.MaxPods), resource.DecimalSI) + + node.Status.Allocatable[apiv1.ResourceCPU] = *resource.NewQuantity(int64(ng.specs.CpuAllocatable), resource.DecimalSI) + node.Status.Allocatable[apiv1.ResourceMemory] = *resource.NewQuantity(int64(ng.specs.MemoryAllocatable), resource.DecimalSI) + node.Status.Allocatable[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(int64(ng.specs.LocalStorageAllocatable), resource.DecimalSI) + node.Status.Allocatable[apiv1.ResourcePods] = *resource.NewQuantity(int64(ng.specs.MaxPods), resource.DecimalSI) + + if ng.specs.Gpu > 0 { + nbGpu := *resource.NewQuantity(int64(ng.specs.Gpu), resource.DecimalSI) + node.Status.Capacity[gpu.ResourceNvidiaGPU] = nbGpu + node.Status.Allocatable[gpu.ResourceNvidiaGPU] = nbGpu + } + + node.Status.Conditions = cloudprovider.BuildReadyConditions() + node.Spec.Taints = parseTaints(ng.specs.Taints) + + nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(ng.p.Name)) + nodeInfo.SetNode(&node) + return nodeInfo, nil +} + +func parseTaints(taints map[string]string) []apiv1.Taint { + k8sTaints := make([]apiv1.Taint, 0, len(taints)) + + for key, valueEffect := range taints { + + splittedValueEffect := strings.Split(valueEffect, ":") + var taint apiv1.Taint + + switch apiv1.TaintEffect(splittedValueEffect[len(splittedValueEffect)-1]) { + case apiv1.TaintEffectNoExecute: + taint.Effect = apiv1.TaintEffectNoExecute + case apiv1.TaintEffectNoSchedule: + taint.Effect = apiv1.TaintEffectNoSchedule + case apiv1.TaintEffectPreferNoSchedule: + taint.Effect = apiv1.TaintEffectPreferNoSchedule + default: + continue + } + if len(splittedValueEffect) == 2 { + taint.Value = splittedValueEffect[0] + } + taint.Key = key + + k8sTaints = append(k8sTaints, taint) + } + return k8sTaints +} + +// Exist checks if the node group really exists on the cloud provider side. Allows to tell the +// theoretical node group from the real one. +func (ng *NodeGroup) Exist() bool { + + klog.V(4).Infof("Exist,PoolID=%s", ng.p.ID) + + _, err := ng.GetPool(context.Background(), &scalewaygo.GetPoolRequest{ + PoolID: ng.p.ID, + }) + if err != nil && errors.Is(err, scalewaygo.ErrClientSide) { + return false + } + return true + +} + +// Pool Autoprovision feature is not supported by Scaleway + +// Create creates the node group on the cloud provider side. +func (ng *NodeGroup) Create() (cloudprovider.NodeGroup, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// Delete deletes the node group on the cloud provider side. +func (ng *NodeGroup) Delete() error { + return cloudprovider.ErrNotImplemented +} + +// Autoprovisioned returns true if the node group is autoprovisioned. +func (ng *NodeGroup) Autoprovisioned() bool { + return false +} + +// GetOptions returns nil which means 'use defaults options' +func (ng *NodeGroup) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// nodesFromPool returns the nodes associated to a Scaleway Pool +func nodesFromPool(client scalewaygo.Client, p *scalewaygo.Pool) (map[string]*scalewaygo.Node, error) { + + ctx := context.Background() + resp, err := client.ListNodes(ctx, &scalewaygo.ListNodesRequest{ClusterID: p.ClusterID, PoolID: &p.ID}) + if err != nil { + return nil, err + } + + nodes := make(map[string]*scalewaygo.Node) + for _, node := range resp.Nodes { + nodes[node.ProviderID] = node + } + + klog.V(4).Infof("nodesFromPool,PoolID=%s,%d nodes found", p.ID, len(nodes)) + + return nodes, nil +} + +func fromScwStatus(status scalewaygo.NodeStatus) *cloudprovider.InstanceStatus { + st := &cloudprovider.InstanceStatus{} + switch status { + case scalewaygo.NodeStatusReady: + st.State = cloudprovider.InstanceRunning + case scalewaygo.NodeStatusCreating, scalewaygo.NodeStatusStarting, + scalewaygo.NodeStatusRegistering, scalewaygo.NodeStatusNotReady, + scalewaygo.NodeStatusUpgrading, scalewaygo.NodeStatusRebooting: + st.State = cloudprovider.InstanceCreating + case scalewaygo.NodeStatusDeleting: + st.State = cloudprovider.InstanceDeleting + case scalewaygo.NodeStatusCreationError: + st.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorCode: string(scalewaygo.NodeStatusCreationError), + ErrorMessage: "scaleway node could not be created", + } + case scalewaygo.NodeStatusDeleted: + st.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorCode: string(scalewaygo.NodeStatusDeleted), + ErrorMessage: "node has already been deleted", + } + case scalewaygo.NodeStatusLocked: + st.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorCode: string(scalewaygo.NodeStatusLocked), + ErrorMessage: "node is locked for legal reasons", + } + default: + st.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorCode: string(status), + } + } + + return st +} diff --git a/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group_test.go b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group_test.go new file mode 100644 index 000000000000..b971cbdaff03 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/scaleway_node_group_test.go @@ -0,0 +1,238 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaleway + +import ( + "context" + "errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway/scalewaygo" + "testing" +) + +func TestNodeGroup_TargetSize(t *testing.T) { + var nodesNb uint32 = 3 + + ng := &NodeGroup{ + p: &scalewaygo.Pool{ + Size: nodesNb, + }, + } + size, err := ng.TargetSize() + assert.NoError(t, err) + assert.Equal(t, int(nodesNb), size, "target size is wrong") +} + +func TestNodeGroup_IncreaseSize(t *testing.T) { + ctx := context.Background() + nodesNb := 3 + delta := 2 + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + p: &scalewaygo.Pool{ + Size: uint32(nodesNb), + MinSize: 1, + MaxSize: 10, + Autoscaling: true, + }, + } + + newSize := uint32(nodesNb + delta) + client.On("UpdatePool", + ctx, + &scalewaygo.UpdatePoolRequest{ + PoolID: ng.p.ID, + Size: &newSize, + }).Return( + &scalewaygo.Pool{ + Size: newSize, + }, nil, + ).Once() + err := ng.IncreaseSize(delta) + assert.NoError(t, err) +} + +func TestNodeGroup_IncreaseNegativeDelta(t *testing.T) { + nodesNb := 3 + delta := -2 + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + p: &scalewaygo.Pool{ + Size: uint32(nodesNb), + }, + } + + err := ng.IncreaseSize(delta) + assert.Error(t, err) +} + +func TestNodeGroup_IncreaseAboveMaximum(t *testing.T) { + nodesNb := 3 + delta := 10 + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + p: &scalewaygo.Pool{ + Size: uint32(nodesNb), + MaxSize: 10, + }, + } + + err := ng.IncreaseSize(delta) + assert.Error(t, err) +} + +func TestNodeGroup_DecreaseTargetSize(t *testing.T) { + ctx := context.Background() + nodesNb := 5 + delta := -4 + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + p: &scalewaygo.Pool{ + Size: uint32(nodesNb), + MinSize: 1, + MaxSize: 10, + Autoscaling: true, + }, + } + + newSize := uint32(nodesNb + delta) + client.On("UpdatePool", + ctx, + &scalewaygo.UpdatePoolRequest{ + PoolID: ng.p.ID, + Size: &newSize, + }).Return( + &scalewaygo.Pool{ + Size: newSize, + }, nil, + ).Once() + err := ng.DecreaseTargetSize(delta) + assert.NoError(t, err) +} + +func TestNodeGroup_DecreaseTargetSizePositiveDelta(t *testing.T) { + nodesNb := 3 + delta := 2 + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + p: &scalewaygo.Pool{ + Size: uint32(nodesNb), + }, + } + + err := ng.DecreaseTargetSize(delta) + assert.Error(t, err) +} + +func TestNodeGroup_DecreaseBelowMinimum(t *testing.T) { + nodesNb := 3 + delta := -3 + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + p: &scalewaygo.Pool{ + Size: uint32(nodesNb), + MinSize: 1, + }, + } + + err := ng.DecreaseTargetSize(delta) + assert.Error(t, err) +} + +func TestNodeGroup_DeleteNodes(t *testing.T) { + ctx := context.Background() + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + nodes: map[string]*scalewaygo.Node{ + "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15": {ID: "6852824b-e409-4c77-94df-819629d135b9", ProviderID: "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"}, + "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066": {ID: "84acb1a6-0e14-4j36-8b32-71bf7b328c22", ProviderID: "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"}, + "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529": {ID: "5c4d832a-d964-4c64-9d53-b9295c206cdd", ProviderID: "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"}, + }, + p: &scalewaygo.Pool{ + Size: 3, + }, + } + + nodes := []*apiv1.Node{ + {Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"}}, + {Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"}}, + {Spec: apiv1.NodeSpec{ProviderID: "scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"}}, + } + client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-par-1/f80ce5b1-7c77-4177-bd5f-0d803f5b7c15"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once() + client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-srr-1/6c22c989-ddce-41d8-98cb-2aea83c72066"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once() + client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: ng.nodes["scaleway://instance/fr-srr-1/fcc3abe0-3a72-4178-8182-2a93fdc72529"].ID}).Return(&scalewaygo.Node{Status: scalewaygo.NodeStatusDeleting}, nil).Once() + + err := ng.DeleteNodes(nodes) + assert.NoError(t, err) + assert.Equal(t, uint32(0), ng.p.Size) +} + +func TestNodeGroup_DeleteNodesErr(t *testing.T) { + ctx := context.Background() + client := &clientMock{} + ng := &NodeGroup{ + Client: client, + nodes: map[string]*scalewaygo.Node{ + "nonexistent-on-provider-side": {ID: "unknown"}, + }, + } + nodes := []*apiv1.Node{ + {Spec: apiv1.NodeSpec{ProviderID: "nonexistent-on-provider-side"}}, + } + client.On("DeleteNode", ctx, &scalewaygo.DeleteNodeRequest{NodeID: "unknown"}).Return(&scalewaygo.Node{}, errors.New("nonexistent")).Once() + + err := ng.DeleteNodes(nodes) + assert.Error(t, err) +} + +type clientMock struct { + mock.Mock +} + +func (m *clientMock) GetPool(ctx context.Context, req *scalewaygo.GetPoolRequest) (*scalewaygo.Pool, error) { + args := m.Called(ctx, req) + return args.Get(0).(*scalewaygo.Pool), args.Error(1) +} + +func (m *clientMock) ListPools(ctx context.Context, req *scalewaygo.ListPoolsRequest) (*scalewaygo.ListPoolsResponse, error) { + args := m.Called(ctx, req) + return args.Get(0).(*scalewaygo.ListPoolsResponse), args.Error(1) +} + +func (m *clientMock) UpdatePool(ctx context.Context, req *scalewaygo.UpdatePoolRequest) (*scalewaygo.Pool, error) { + args := m.Called(ctx, req) + return args.Get(0).(*scalewaygo.Pool), args.Error(1) +} + +func (m *clientMock) ListNodes(ctx context.Context, req *scalewaygo.ListNodesRequest) (*scalewaygo.ListNodesResponse, error) { + args := m.Called(ctx, req) + return args.Get(0).(*scalewaygo.ListNodesResponse), args.Error(1) +} + +func (m *clientMock) DeleteNode(ctx context.Context, req *scalewaygo.DeleteNodeRequest) (*scalewaygo.Node, error) { + args := m.Called(ctx, req) + return args.Get(0).(*scalewaygo.Node), args.Error(1) +} diff --git a/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/README.md b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/README.md new file mode 100644 index 000000000000..0b69b5efa31b --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/README.md @@ -0,0 +1,5 @@ +# Scaleway REST API Client + +This package implements a minimal REST API client for Scaleway Kapsule API. + +It only implements endpoints used by the Scaleway Cloud Provider, it is intended to be a drop-in replacement for [scaleway-sdk-go](https://github.com/scaleway/scaleway-sdk-go). diff --git a/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go new file mode 100644 index 000000000000..83e9341303fc --- /dev/null +++ b/cluster-autoscaler/cloudprovider/scaleway/scalewaygo/scaleway_kapsule_api.go @@ -0,0 +1,624 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scalewaygo + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "k8s.io/autoscaler/cluster-autoscaler/version" + "k8s.io/klog/v2" + "net" + "net/http" + "net/url" + "time" +) + +const ( + defaultApiURL string = "https://api.scaleway.com" + defaultHTTPTimeout = 30 + pageSizeListPools uint32 = 100 + pageSizeListNodes uint32 = 100 +) + +var ( + // ErrMissingClusterID is returned when no cluster id has been found + // either in env variables or in config file + ErrMissingClusterID = errors.New("cluster ID is not provided") + // ErrMissingSecretKey is returned when no secret key has been found + // either in env variables or in config file + ErrMissingSecretKey = errors.New("scaleway secret key is not provided") + // ErrMissingRegion is returned when no region has been found + // either in env variables or in config file + ErrMissingRegion = errors.New("region is not provided") + + // ErrClientSide indicates an error on user side + ErrClientSide = errors.New("400 error type") + // ErrServerSide indicates an error on server side + ErrServerSide = errors.New("500 error type") + // ErrOther indicates a generic HTTP error + ErrOther = errors.New("generic error type") +) + +// Config is used to deserialize config file passed with flag `cloud-config` +type Config struct { + ClusterID string `json:"cluster_id"` + SecretKey string `json:"secret_key"` + Region string `json:"region"` + ApiUrl string `json:"api_url"` + UserAgent string +} + +// NewClient returns a new Client able to talk to Scaleway API +func NewClient(cfg Config) (*client, error) { + if cfg.ClusterID == "" { + return nil, ErrMissingClusterID + } + if cfg.SecretKey == "" { + return nil, ErrMissingSecretKey + } + if cfg.Region == "" { + return nil, ErrMissingRegion + } + if cfg.ApiUrl == "" { + cfg.ApiUrl = defaultApiURL + } + + hc := &http.Client{ + Timeout: defaultHTTPTimeout * time.Second, + Transport: &http.Transport{ + DialContext: (&net.Dialer{Timeout: 5 * time.Second}).DialContext, + TLSHandshakeTimeout: 5 * time.Second, + ResponseHeaderTimeout: 30 * time.Second, + }, + } + + return &client{ + httpClient: hc, + apiURL: cfg.ApiUrl, + token: cfg.SecretKey, + userAgent: fmt.Sprintf("%s/%s cluster-id/%s", cfg.UserAgent, version.ClusterAutoscalerVersion, cfg.ClusterID), + region: cfg.Region, + }, nil +} + +// scalewayRequest contains all the contents related to performing a request on the Scaleway API. +type scalewayRequest struct { + Method string + Path string + Query url.Values + Body io.Reader +} + +// Listing queries default to `fetch all resources` if no `page` is provided +// as CA needs access to all nodes and pools + +// Client is used to talk to Scaleway Kapsule API +type Client interface { + GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error) + ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) + UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error) + ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) + DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error) +} + +// client contains necessary information to perform API calls +type client struct { + httpClient *http.Client + apiURL string + token string + userAgent string + region string +} + +func (req *scalewayRequest) getURL(apiURL string) (*url.URL, error) { + completeURL, err := url.Parse(apiURL + req.Path) + if err != nil { + return nil, fmt.Errorf("invalid url %s: %s", apiURL+req.Path, err) + } + completeURL.RawQuery = req.Query.Encode() + + return completeURL, nil +} + +func (c *client) ApiURL() string { + return c.apiURL +} + +func (c *client) Token() string { + return c.token +} + +func (c *client) Region() string { + return c.region +} + +// do perform a single HTTP request based on the generic Request object. +func (c *client) do(ctx context.Context, req *scalewayRequest, res interface{}) error { + if req == nil { + return errors.New("request must be non-nil") + } + + // build URL + completeURL, err := req.getURL(c.apiURL) + if err != nil { + return err + } + + // build request + httpRequest, err := http.NewRequest(req.Method, completeURL.String(), req.Body) + if err != nil { + return fmt.Errorf("could not create request: %w", err) + } + + httpRequest.Header.Set("User-Agent", c.userAgent) + httpRequest.Header.Set("X-Auth-Token", c.token) + if req.Body != nil { + httpRequest.Header.Set("Content-Type", "application/json") + } + + httpRequest = httpRequest.WithContext(ctx) + + // execute request + httpResponse, err := c.httpClient.Do(httpRequest) + if err != nil { + return fmt.Errorf("error executing request: %w", err) + } + + defer func() { + if err := httpResponse.Body.Close(); err != nil { + klog.Errorf("failed to close response body: %v", err) + } + }() + + ct := httpResponse.Header.Get("Content-Type") + if ct != "application/json" { + return fmt.Errorf("unexpected content-type: %s with status: %s", ct, httpResponse.Status) + } + + err = json.NewDecoder(httpResponse.Body).Decode(&res) + if err != nil { + return fmt.Errorf("could not parse %s response body: %w", ct, err) + } + + switch { + case httpResponse.StatusCode >= 200 && httpResponse.StatusCode < 300: + return nil + case httpResponse.StatusCode >= 400 && httpResponse.StatusCode < 500: + err = ErrClientSide + case httpResponse.StatusCode >= 500 && httpResponse.StatusCode < 600: + err = ErrServerSide + default: + err = ErrOther + + } + + return fmt.Errorf("%d %v %v: %w", httpResponse.StatusCode, httpRequest.Method, httpRequest.URL, err) +} + +// NodeStatus is the state in which a node might be +type NodeStatus string + +const ( + // NodeStatusCreating indicates that node is provisioning the underlying instance/BM + NodeStatusCreating = NodeStatus("creating") + // NodeStatusStarting indicates that node is being configured and/or booting + NodeStatusStarting = NodeStatus("starting") + // NodeStatusRegistering indicates that underlying node has booted and k8s services are starting + NodeStatusRegistering = NodeStatus("registering") + // NodeStatusNotReady indicates that k8s has marked this node as `NotReady` + NodeStatusNotReady = NodeStatus("not_ready") + // NodeStatusReady indicates that node is ready for use + NodeStatusReady = NodeStatus("ready") + // NodeStatusDeleting indicates that node is being deleted + NodeStatusDeleting = NodeStatus("deleting") + // NodeStatusDeleted indicates that node is deleted + NodeStatusDeleted = NodeStatus("deleted") + // NodeStatusLocked indicates that node has been locked for legal reasons + NodeStatusLocked = NodeStatus("locked") + // NodeStatusRebooting indicates that node is rebooting + NodeStatusRebooting = NodeStatus("rebooting") + // NodeStatusCreationError indicates that node failed to create + NodeStatusCreationError = NodeStatus("creation_error") + // NodeStatusUpgrading indicates that this node CP is currently upgrading k8s version + NodeStatusUpgrading = NodeStatus("upgrading") +) + +// Node represents an instance running in a scaleway pool +type Node struct { + // ID: the ID of the node + ID string `json:"id"` + // PoolID: the pool ID of the node + PoolID string `json:"pool_id"` + // ClusterID: the cluster ID of the node + ClusterID string `json:"cluster_id"` + // ProviderID: the underlying instance ID + ProviderID string `json:"provider_id"` + // Name: the name of the node + Name string `json:"name"` + // Status: the status of the node + Status NodeStatus `json:"status"` + // CreatedAt: the date at which the node was created + CreatedAt *time.Time `json:"created_at"` + // UpdatedAt: the date at which the node was last updated + UpdatedAt *time.Time `json:"updated_at"` +} + +// PoolStatus is the state in which a pool might be (unused) +type PoolStatus string + +// These are possible statuses for a scaleway pool +const ( + PoolStatusReady = PoolStatus("ready") + PoolStatusDeleting = PoolStatus("deleting") + PoolStatusDeleted = PoolStatus("deleted") + PoolStatusScaling = PoolStatus("scaling") + PoolStatusWarning = PoolStatus("warning") + PoolStatusLocked = PoolStatus("locked") + PoolStatusUpgrading = PoolStatus("upgrading") +) + +// Pool is the abstraction used to gather nodes with the same specs +type Pool struct { + // ID: the ID of the pool + ID string `json:"id"` + // ClusterID: the cluster ID of the pool + ClusterID string `json:"cluster_id"` + // CreatedAt: the date at which the pool was created + CreatedAt *time.Time `json:"created_at"` + // UpdatedAt: the date at which the pool was last updated + UpdatedAt *time.Time `json:"updated_at"` + // Name: the name of the pool + Name string `json:"name"` + // Status: the status of the pool + Status PoolStatus `json:"status"` + // Version: the version of the pool + Version string `json:"version"` + // NodeType: the node type is the type of Scaleway Instance wanted for the pool + NodeType string `json:"node_type"` + // Autoscaling: the enablement of the autoscaling feature for the pool + Autoscaling bool `json:"autoscaling"` + // Size: the size (number of nodes) of the pool + Size uint32 `json:"size"` + // MinSize: the minimum size of the pool + MinSize uint32 `json:"min_size"` + // MaxSize: the maximum size of the pool + MaxSize uint32 `json:"max_size"` + // Zone: the zone where the nodes will be spawn in + Zone string `json:"zone"` +} + +// GetPoolRequest is passed to `GetPool` method +type GetPoolRequest struct { + // PoolID: the ID of the requested pool + PoolID string `json:"-"` +} + +// GetPool is used to request a Pool by its id +func (c *client) GetPool(ctx context.Context, req *GetPoolRequest) (*Pool, error) { + var err error + + klog.V(4).Info("GetPool,PoolID=", req.PoolID) + + if fmt.Sprint(req.PoolID) == "" { + return nil, errors.New("field PoolID cannot be empty in request") + } + + scwReq := &scalewayRequest{ + Method: "GET", + Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "", + } + + var resp Pool + + err = c.do(ctx, scwReq, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +// ListPoolsRequest is passed to `ListPools` method +// it can be used for optional pagination +type ListPoolsRequest struct { + // the ID of the cluster from which the pools will be listed from + ClusterID string `json:"-"` + // Page: the page number for the returned pools + Page *int32 `json:"-"` + // PageSize: the maximum number of pools per page + PageSize *uint32 `json:"-"` +} + +// GenericNodeSpecs represents NodeType specs used for scale-up simulations. +// it is used to select the appropriate pool to scale-up. +type GenericNodeSpecs struct { + NodePricePerHour float32 `json:"node_price_per_hour"` + MaxPods uint32 `json:"max_pods"` + Gpu uint32 `json:"gpu"` + CpuCapacity uint32 `json:"cpu_capacity"` + CpuAllocatable uint32 `json:"cpu_allocatable"` + MemoryCapacity uint64 `json:"memory_capacity"` + MemoryAllocatable uint64 `json:"memory_allocatable"` + LocalStorageCapacity uint64 `json:"local_storage_capacity"` + LocalStorageAllocatable uint64 `json:"local_storage_allocatable"` + Labels map[string]string `json:"labels"` + Taints map[string]string `json:"taints"` +} + +// PoolWithGenericNodeSpecs contains the requested `Pool` with additional `Specs` information +type PoolWithGenericNodeSpecs struct { + Pool *Pool `json:"pool"` + Specs GenericNodeSpecs `json:"specs"` +} + +// ListPoolsResponse is returned from `ListPools` method +type ListPoolsResponse struct { + // TotalCount: the total number of pools that exists for the cluster + TotalCount uint32 `json:"total_count"` + // Pools: the paginated returned pools + Pools []*PoolWithGenericNodeSpecs `json:"pools"` +} + +// ListPools returns pools associated to a cluster id, pagination optional +func (c *client) ListPools(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) { + klog.V(4).Info("ListPools,ClusterID=", req.ClusterID) + + if req.Page != nil { + return c.listPoolsPaginated(ctx, req) + } + + listPools := func(page int32) (*ListPoolsResponse, error) { + + return c.listPoolsPaginated(ctx, &ListPoolsRequest{ + ClusterID: req.ClusterID, + Page: &page, + }) + } + + page := int32(1) + resp, err := listPools(page) + if err != nil { + return nil, err + } + + nbPages := (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools + + for uint32(page) <= nbPages { + page++ + r, err := listPools(page) + if err != nil { + return nil, err + } + + resp.Pools = append(resp.Pools, r.Pools...) + + if r.TotalCount != resp.TotalCount { + // pools have changed on scaleway side, retrying + resp.TotalCount = r.TotalCount + resp.Pools = []*PoolWithGenericNodeSpecs{} + page = int32(1) + nbPages = (resp.TotalCount + pageSizeListPools - 1) / pageSizeListPools + } + } + return resp, nil +} + +func (c *client) listPoolsPaginated(ctx context.Context, req *ListPoolsRequest) (*ListPoolsResponse, error) { + var err error + + pageSize := pageSizeListPools + if req.PageSize == nil { + req.PageSize = &pageSize + } + + query := url.Values{} + if req.Page != nil { + query.Set("page", fmt.Sprint(*req.Page)) + } + query.Set("page_size", fmt.Sprint(*req.PageSize)) + + if fmt.Sprint(req.ClusterID) == "" { + return nil, errors.New("field ClusterID cannot be empty in request") + } + + scwReq := &scalewayRequest{ + Method: "GET", + Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/pools-autoscaler", + Query: query, + } + + var resp ListPoolsResponse + + err = c.do(ctx, scwReq, &resp) + if err != nil { + return nil, err + } + + return &resp, nil +} + +// UpdatePoolRequest is passed to `UpdatePool` method +type UpdatePoolRequest struct { + // PoolID: the ID of the pool to update + PoolID string `json:"-"` + // Size: the new size for the pool + Size *uint32 `json:"size"` +} + +// UpdatePool is used to resize a pool, to decrease pool size `DeleteNode` should be used instead +func (c *client) UpdatePool(ctx context.Context, req *UpdatePoolRequest) (*Pool, error) { + var err error + + klog.V(4).Info("UpdatePool,PoolID=", req.PoolID) + + if fmt.Sprint(req.PoolID) == "" { + return nil, errors.New("field PoolID cannot be empty in request") + } + + scwReq := &scalewayRequest{ + Method: "PATCH", + Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/pools/" + fmt.Sprint(req.PoolID) + "", + } + + buf, err := json.Marshal(req) + if err != nil { + return nil, err + } + scwReq.Body = bytes.NewReader(buf) + + var resp Pool + + err = c.do(ctx, scwReq, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +// ListNodesRequest is passed to `ListNodes` method +type ListNodesRequest struct { + // ClusterID: the cluster ID from which the nodes will be listed from + ClusterID string `json:"-"` + // PoolID: the pool ID on which to filter the returned nodes + PoolID *string `json:"-"` + // Page: the page number for the returned nodes + Page *int32 `json:"-"` + // PageSize: the maximum number of nodes per page + PageSize *uint32 `json:"-"` +} + +// ListNodesResponse is returned from `ListNodes` method +type ListNodesResponse struct { + // TotalCount: the total number of nodes + TotalCount uint32 `json:"total_count"` + // Nodes: the paginated returned nodes + Nodes []*Node `json:"nodes"` +} + +// ListNodes returns the Nodes associated to a Cluster and/or a Pool +func (c *client) ListNodes(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) { + klog.V(4).Info("ListNodes,ClusterID=", req.ClusterID) + + if req.Page != nil { + return c.listNodesPaginated(ctx, req) + } + + listNodes := func(page int32) (*ListNodesResponse, error) { + ctx := context.Background() + + return c.listNodesPaginated(ctx, &ListNodesRequest{ + ClusterID: req.ClusterID, + PoolID: req.PoolID, + Page: &page, + }) + } + + page := int32(1) + resp, err := listNodes(page) + if err != nil { + return nil, err + } + + nbPages := (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes + + for uint32(page) <= nbPages { + page++ + r, err := listNodes(page) + if err != nil { + return nil, err + } + + resp.Nodes = append(resp.Nodes, r.Nodes...) + + if r.TotalCount != resp.TotalCount { + // nodes have changed on scaleway side, retrying + resp.TotalCount = r.TotalCount + resp.Nodes = []*Node{} + page = int32(1) + nbPages = (resp.TotalCount + pageSizeListNodes - 1) / pageSizeListNodes + } + } + return resp, nil +} + +func (c *client) listNodesPaginated(ctx context.Context, req *ListNodesRequest) (*ListNodesResponse, error) { + var err error + + pageSize := pageSizeListNodes + if req.PageSize == nil { + req.PageSize = &pageSize + } + + query := url.Values{} + if req.PoolID != nil { + query.Set("pool_id", fmt.Sprint(*req.PoolID)) + } + if req.Page != nil { + query.Set("page", fmt.Sprint(*req.Page)) + } + query.Set("page_size", fmt.Sprint(*req.PageSize)) + + if fmt.Sprint(req.ClusterID) == "" { + return nil, errors.New("field ClusterID cannot be empty in request") + } + + scwReq := &scalewayRequest{ + Method: "GET", + Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/clusters/" + fmt.Sprint(req.ClusterID) + "/nodes", + Query: query, + } + + var resp ListNodesResponse + + err = c.do(ctx, scwReq, &resp) + if err != nil { + return nil, err + } + return &resp, nil +} + +// DeleteNodeRequest is passed to `DeleteNode` method +type DeleteNodeRequest struct { + NodeID string `json:"-"` +} + +// DeleteNode asynchronously deletes a Node by its id +func (c *client) DeleteNode(ctx context.Context, req *DeleteNodeRequest) (*Node, error) { + var err error + + klog.V(4).Info("DeleteNode,NodeID=", req.NodeID) + + if fmt.Sprint(req.NodeID) == "" { + return nil, errors.New("field NodeID cannot be empty in request") + } + + scwReq := &scalewayRequest{ + Method: "DELETE", + Path: "/k8s/v1/regions/" + fmt.Sprint(c.region) + "/nodes/" + fmt.Sprint(req.NodeID) + "", + } + + var resp Node + + err = c.do(ctx, scwReq, &resp) + if err != nil { + return nil, err + } + return &resp, nil +}