Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding rosa classic to workers scale workload #107

Merged
merged 12 commits into from
Sep 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"context"
Expand Down Expand Up @@ -275,7 +275,7 @@ func waitForMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clie
wg.Add(1)
go func(ms string, r int) {
defer wg.Done()
err := waitForMachineSet(machineClient, ms, int32(r), maxWaitTimeout)
err := waitForMachineSet(machineClient, ms, int32(r))
if err != nil {
log.Errorf("Failed waiting for MachineSet %s: %v", ms, err)
}
Expand All @@ -284,7 +284,7 @@ func waitForMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clie
})
wg.Wait()
log.Infof("All the machinesets have been scaled")
if err := waitForNodes(clientSet, maxWaitTimeout); err != nil {
log.Infof("Error waiting for nodes: %v", err)
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
}
6 changes: 3 additions & 3 deletions pkg/workers_scale/base.go → pkg/workerscale/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"sort"
Expand All @@ -35,8 +35,8 @@ func (awsScenario *BaseScenario) OrchestrateWorkload(scaleConfig ScaleConfig) {
log.Info("Scale event epoch time specified. Hence calculating node latencies without any scaling")
setupMetrics(scaleConfig.UUID, scaleConfig.Metadata, kubeClientProvider)
measurements.Start()
if err := waitForNodes(clientSet, maxWaitTimeout); err != nil {
log.Infof("Error waiting for nodes: %v", err)
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
if err = measurements.Stop(); err != nil {
log.Error(err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import "time"

Expand Down
34 changes: 28 additions & 6 deletions pkg/workers_scale/machines.go → pkg/workerscale/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"context"
Expand Down Expand Up @@ -90,7 +90,7 @@ func editMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clientS
wg.Add(1)
go func(ms string, r int) {
defer wg.Done()
err := updateMachineSetReplicas(machineClient, ms, int32(r), maxWaitTimeout, machineSetsToEdit)
err := updateMachineSetReplicas(machineClient, ms, int32(r), machineSetsToEdit)
if err != nil {
log.Errorf("Failed to edit MachineSet %s: %v", ms, err)
}
Expand All @@ -99,13 +99,13 @@ func editMachineSets(machineClient *machinev1beta1.MachineV1beta1Client, clientS
})
wg.Wait()
log.Infof("All the machinesets have been editted")
if err := waitForNodes(clientSet, maxWaitTimeout); err != nil {
if err := waitForNodes(clientSet); err != nil {
log.Infof("Error waiting for nodes: %v", err)
}
}

// updateMachineSetsReplicas updates machines replicas
func updateMachineSetReplicas(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32, maxWaitTimeout time.Duration, machineSetsToEdit *sync.Map) error {
func updateMachineSetReplicas(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32, machineSetsToEdit *sync.Map) error {
machineSet, err := machineClient.MachineSets(machineNamespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting machineset: %s", err)
Expand All @@ -122,7 +122,7 @@ func updateMachineSetReplicas(machineClient *machinev1beta1.MachineV1beta1Client
msInfo.lastUpdatedTime = updateTimestamp
machineSetsToEdit.Store(name, msInfo)

err = waitForMachineSet(machineClient, name, newReplicaCount, maxWaitTimeout)
err = waitForMachineSet(machineClient, name, newReplicaCount)
if err != nil {
return fmt.Errorf("timeout waiting for MachineSet %s to be ready: %v", name, err)
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func getMachineSets(machineClient *machinev1beta1.MachineV1beta1Client) map[int]
}

// waitForMachineSet waits for machinesets to be ready with new replica count
func waitForMachineSet(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32, maxWaitTimeout time.Duration) error {
func waitForMachineSet(machineClient *machinev1beta1.MachineV1beta1Client, name string, newReplicaCount int32) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Second, maxWaitTimeout, true, func(ctx context.Context) (done bool, err error) {
ms, err := machineClient.MachineSets(machineNamespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
Expand All @@ -164,3 +164,25 @@ func waitForMachineSet(machineClient *machinev1beta1.MachineV1beta1Client, name
return false, nil
})
}

// waitForWorkerMachineSets waits for all the worker machinesets in specific to be ready
func waitForWorkerMachineSets(machineClient *machinev1beta1.MachineV1beta1Client) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Second, maxWaitTimeout, true, func(_ context.Context) (done bool, err error) {
// Get all MachineSets with the worker label
labelSelector := metav1.ListOptions{
LabelSelector: "hive.openshift.io/machine-pool=worker",
}
machineSets, err := machineClient.MachineSets(machineNamespace).List(context.TODO(), labelSelector)
if err != nil {
return false, err
}
for _, ms := range machineSets.Items {
if ms.Status.Replicas != ms.Status.ReadyReplicas {
log.Debugf("Waiting for MachineSet %s to reach %d replicas, currently %d ready", ms.Name, ms.Status.Replicas, ms.Status.ReadyReplicas)
return false, nil
}
}
log.Info("All worker MachineSets have reached desired replica count")
return true, nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"strings"
Expand Down
151 changes: 151 additions & 0 deletions pkg/workerscale/rosa.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2024 The Kube-burner Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package workerscale

import (
"context"
"fmt"
"os/exec"
"sync"
"time"

"github.com/kube-burner/kube-burner/pkg/config"
"github.com/kube-burner/kube-burner/pkg/measurements"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

type RosaScenario struct{}

// Returns a new scenario object
func (rosaScenario *RosaScenario) OrchestrateWorkload(scaleConfig ScaleConfig) {
var err error
var triggerJob string
kubeClientProvider := config.NewKubeClientProvider("", "")
clientSet, restConfig := kubeClientProvider.ClientSet(0, 0)
machineClient := getMachineClient(restConfig)
dynamicClient := dynamic.NewForConfigOrDie(restConfig)
clusterID := getClusterID(dynamicClient)
if scaleConfig.ScaleEventEpoch != 0 {
log.Info("Scale event epoch time specified. Hence calculating node latencies without any scaling")
setupMetrics(scaleConfig.UUID, scaleConfig.Metadata, kubeClientProvider)
measurements.Start()
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
if err = measurements.Stop(); err != nil {
log.Error(err.Error())
}
scaledMachineDetails, amiID := getMachines(machineClient, scaleConfig.ScaleEventEpoch)
finalizeMetrics(&sync.Map{}, scaledMachineDetails, scaleConfig.Indexer, amiID, scaleConfig.ScaleEventEpoch)
} else {
prevMachineDetails, _ := getMachines(machineClient, 0)
setupMetrics(scaleConfig.UUID, scaleConfig.Metadata, kubeClientProvider)
measurements.Start()
log.Info("Updating machinepool to the desired worker count")
triggerTime := editMachinepool(clusterID, len(prevMachineDetails), len(prevMachineDetails)+scaleConfig.AdditionalWorkerNodes, scaleConfig.AutoScalerEnabled)
if scaleConfig.AutoScalerEnabled {
triggerJob, triggerTime = createBatchJob(clientSet)
// Delay for the clusterautoscaler resources to come up
time.Sleep(5 * time.Minute)
} else {
// Delay for the rosa to update the machinesets
time.Sleep(1 * time.Minute)
}
log.Info("Waiting for the machinesets to be ready")
err = waitForWorkerMachineSets(machineClient)
if err != nil {
log.Fatalf("Error waitingMachineSets to be ready: %v", err)
}
if err := waitForNodes(clientSet); err != nil {
log.Fatalf("Error waiting for nodes: %v", err)
}
if err = measurements.Stop(); err != nil {
log.Error(err.Error())
}
scaledMachineDetails, amiID := getMachines(machineClient, 0)
discardPreviousMachines(prevMachineDetails, scaledMachineDetails)
finalizeMetrics(&sync.Map{}, scaledMachineDetails, scaleConfig.Indexer, amiID, triggerTime.Unix())
if scaleConfig.GC {
if scaleConfig.AutoScalerEnabled {
deleteBatchJob(clientSet, triggerJob)
}
log.Info("Restoring machine sets to previous state")
editMachinepool(clusterID, len(prevMachineDetails), len(prevMachineDetails), false)
}
}
}

// editMachinepool edits machinepool to desired replica count
func editMachinepool(clusterID string, minReplicas int, maxReplicas int, autoScalerEnabled bool) time.Time {
verifyRosaInstall()
triggerTime := time.Now().UTC().Truncate(time.Second)
cmdArgs := []string{"edit", "machinepool", "-c", clusterID, "worker", fmt.Sprintf("--enable-autoscaling=%t", autoScalerEnabled)}
if autoScalerEnabled {
cmdArgs = append(cmdArgs, fmt.Sprintf("--min-replicas=%d", minReplicas))
cmdArgs = append(cmdArgs, fmt.Sprintf("--max-replicas=%d", maxReplicas))
} else {
cmdArgs = append(cmdArgs, fmt.Sprintf("--replicas=%d", minReplicas))
}
cmd := exec.Command("rosa", cmdArgs...)
editOutput, err := cmd.CombinedOutput()
if err != nil {
log.Fatalf("Failed to edit machinepool: %v. Output: %s", err, string(editOutput))
}
log.Infof("Machinepool edited successfully on cluster: %v", clusterID)
log.Debug(string(editOutput))
return triggerTime
}

// verifyRosaInstall verifies rosa installation and login
func verifyRosaInstall() {
if _, err := exec.LookPath("rosa"); err != nil {
log.Fatal("ROSA CLI is not installed. Please install it and retry.")
return
}
log.Info("ROSA CLI is installed.")

cmd := exec.Command("rosa", "whoami")
output, err := cmd.CombinedOutput()
if err != nil {
log.Fatal("You are not logged in. Please login using 'rosa login' and retry.")
}
log.Info("You are already logged in.")
log.Debug(string(output))
}

// getClusterID fetches the clusterID
func getClusterID(dynamicClient dynamic.Interface) string {
clusterVersionGVR := schema.GroupVersionResource{
Group: "config.openshift.io",
Version: "v1",
Resource: "clusterversions",
}

clusterVersion, err := dynamicClient.Resource(clusterVersionGVR).Get(context.TODO(), "version", metav1.GetOptions{})
if err != nil {
log.Fatalf("Error fetching cluster version: %v", err)
}

clusterID, found, err := unstructured.NestedString(clusterVersion.Object, "spec", "clusterID")
if err != nil || !found {
log.Fatalf("Error retrieving cluster ID: %v", err)
}

return clusterID
}
3 changes: 2 additions & 1 deletion pkg/workers_scale/types.go → pkg/workerscale/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"time"
Expand All @@ -34,6 +34,7 @@ type ScaleConfig struct {
Indexer indexers.Indexer
GC bool
ScaleEventEpoch int64
AutoScalerEnabled bool
}

// Struct to extract AMIID from aws provider spec
Expand Down
4 changes: 2 additions & 2 deletions pkg/workers_scale/utils.go → pkg/workerscale/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package workers_scale
package workerscale

import (
"context"
Expand Down Expand Up @@ -62,7 +62,7 @@ func isNodeReady(node *v1.Node) bool {
}

// waitForNodes waits for all the nodes to be ready
func waitForNodes(clientset kubernetes.Interface, maxWaitTimeout time.Duration) error {
func waitForNodes(clientset kubernetes.Interface) error {
return wait.PollUntilContextTimeout(context.TODO(), time.Second, maxWaitTimeout, true, func(ctx context.Context) (done bool, err error) {
nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions workers-scale.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 The Kube-burner Authors.
// Copyright 2024 The Kube-burner Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import (
"github.com/kube-burner/kube-burner/pkg/workloads"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
wscale "kube-burner.io/ocp/pkg/workers_scale"
wscale "kube-burner.io/ocp/pkg/workerscale"
)

// NewWorkersScale orchestrates scaling workers in ocp wrapper
Expand Down Expand Up @@ -126,14 +126,15 @@ func NewWorkersScale(metricsEndpoint *string, ocpMetaAgent *ocpmetadata.Metadata
indexerValue = value
break
}
scenario := fetchScenario(enableAutoscaler)
scenario := fetchScenario(enableAutoscaler, clusterMetadata)
scenario.OrchestrateWorkload(wscale.ScaleConfig{
UUID: uuid,
AdditionalWorkerNodes: additionalWorkerNodes,
Metadata: metricsScraper.Metadata,
Indexer: indexerValue,
GC: gc,
ScaleEventEpoch: scaleEventEpoch,
AutoScalerEnabled: enableAutoscaler,
})
end := time.Now().Unix()
for _, prometheusClient := range metricsScraper.PrometheusClients {
Expand Down Expand Up @@ -182,10 +183,13 @@ func NewWorkersScale(metricsEndpoint *string, ocpMetaAgent *ocpmetadata.Metadata
}

// FetchScenario helps us to fetch relevant class
func fetchScenario(enableAutoscaler bool) wscale.Scenario {
if enableAutoscaler {
return &wscale.AutoScalerScenario{}
func fetchScenario(enableAutoscaler bool, clusterMetadata ocpmetadata.ClusterMetadata) wscale.Scenario {
if clusterMetadata.ClusterType == "rosa" {
return &wscale.RosaScenario{}
} else {
if enableAutoscaler {
return &wscale.AutoScalerScenario{}
}
return &wscale.BaseScenario{}
}
}
Loading