Skip to content

Commit

Permalink
feat: addition of getting machine pools and hitting the register clus…
Browse files Browse the repository at this point in the history
…ter endpoint in KFM
  • Loading branch information
dimakis committed Jan 23, 2023
1 parent 1baae06 commit 85958fc
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 18 deletions.
39 changes: 39 additions & 0 deletions pkg/cmd/dedicated/dedicatedcmdutil/dedicated_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kafkacmdutil

import (
"fmt"
"github.com/redhat-developer/app-services-cli/pkg/core/errors"
"github.com/redhat-developer/app-services-cli/pkg/core/localize"
"github.com/redhat-developer/app-services-cli/pkg/shared/factory"
"strconv"
)

// Validator is a type for validating Kafka configuration values
type Validator struct {
Localizer localize.Localizer
Connection factory.ConnectionFunc
}

func ValidateMachinePoolCount(count int) bool {
// check if the count is a multiple of 3 and greater than or equal to 3
if count%3 == 0 && count >= 3 {
return true
}
return false
}

func (v *Validator) ValidatorForMachinePoolNodes(val interface{}) error {
value := fmt.Sprintf("%v", val)
if val == "" {
return errors.NewCastError(val, "emtpy string")
}
value1, err := strconv.Atoi(value)
if err != nil {
return errors.NewCastError(val, "integer")
}
if !ValidateMachinePoolCount(value1) {
return fmt.Errorf("invalid input, machine pool node count must be greater than or equal to 3 and it " +
"must be a is a multiple of 3")
}
return nil
}
246 changes: 229 additions & 17 deletions pkg/cmd/dedicated/register/registercluster.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package register

import (
"context"
"fmt"
"github.com/AlecAivazis/survey/v2"
v1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
dedicatedcmdutil "github.com/redhat-developer/app-services-cli/pkg/cmd/dedicated/dedicatedcmdutil"
kafkaFlagutil "github.com/redhat-developer/app-services-cli/pkg/cmd/kafka/flagutil"
kafkamgmtclient "github.com/redhat-developer/app-services-sdk-go/kafkamgmt/apiv1/client"

Expand All @@ -13,18 +15,31 @@ import (
)

type options struct {
selectedClusterId string
kafkaMachinePoolNodeCount int32
clusterList v1.ClusterList
interactive bool
selectedCluster v1.Cluster
clusterMachinePoolList *v1.MachinePoolList
clusterMachinePool v1.MachinePool
requestedMachinePoolNodes int
selectedClusterId string
clusterList v1.ClusterList
//interactive bool
selectedCluster v1.Cluster
clusterMachinePoolList v1.MachinePoolList
existingMachinePoolList []v1.MachinePool
//validatedMachinePoolList []v1.MachinePool
selectedClusterMachinePool v1.MachinePool
requestedMachinePoolNodeCount int
accessKafkasViaPrivateNetwork bool
newMachinePool v1.MachinePool

f *factory.Factory
}

const (
machinePoolId = "kafka-standard"
machinePoolTaintKey = "bf2.org/kafkaInstanceProfileType"
machinePoolTaintEffect = "NoExecute"
machinePoolTaintValue = "standard"
machinePoolInstanceType = "m5.2xlarge"
machinePoolLabelKey = "bf2.org/kafkaInstanceProfileType"
machinePoolLabelValue = "standard"
)

func NewRegisterClusterCommand(f *factory.Factory) *cobra.Command {
opts := &options{
f: f,
Expand Down Expand Up @@ -56,7 +71,9 @@ func NewRegisterClusterCommand(f *factory.Factory) *cobra.Command {
func runRegisterClusterCmd(opts *options) error {
getListClusters(opts)
runClusterSelectionInteractivePrompt(opts)
prepareRequestForKFMEndpoint(opts)
getMachinePoolList(opts)
selectAccessPrivateNetworkInteractivePrompt(opts)
registerClusterWithKFM(opts)

return nil
}
Expand Down Expand Up @@ -88,7 +105,6 @@ func runClusterSelectionInteractivePrompt(opts *options) error {
clusterListString = append(clusterListString, cluster.Name())
}

//clusterList := opts.clusterList.
// TODO add page size and Localizer
prompt := &survey.Select{
Message: "Select the cluster to register",
Expand Down Expand Up @@ -116,23 +132,219 @@ func parseDNSURL(opts *options) (string, error) {
return "", fmt.Errorf("url is empty")
}
return strings.SplitAfter(clusterIngressDNSName, ".apps.")[1], nil
}

// TODO this function should be split the ocm call and the response flow logic
func getMachinePoolList(opts *options) error {
// ocm client connection
conn, err := opts.f.Connection()
if err != nil {
return err
}
client, err := conn.API().OCMClustermgmt()
if err != nil {
return err
}

resource := client.Clusters().Cluster(opts.selectedCluster.ID()).MachinePools().List()
response, err := resource.Send()
if err != nil {
return err
}
if response.Size() == 0 {
createMachinePoolInteractivePrompt(opts)
} else {
for _, machinePool := range response.Items().Slice() {
opts.existingMachinePoolList = append(opts.existingMachinePoolList, *machinePool)
}
err = validateMachinePoolNodes(opts)
if err != nil {
return err
}
}
return nil
}

func checkForValidMachinePoolLabels(machinePool v1.MachinePool) bool {
labels := machinePool.Labels()
for key, value := range labels {
if key == machinePoolLabelKey && value == "standard" {
return true
}
}
return false
}

func validateMachinePoolCount(count int) bool {
if count <= 2 || count%3 != 0 {
return false
}
return true
}

func checkForValidMachinePoolTaints(machinePool v1.MachinePool) bool {
taints := machinePool.Taints()
for _, taint := range taints {
if taint.Effect() == machinePoolTaintEffect && taint.Key() == machinePoolTaintKey {
return true
}
}
return false
}

func createNewMachinePoolTaintsDedicated() *v1.TaintBuilder {
return v1.NewTaint().
Key(machinePoolTaintKey).
Effect(machinePoolTaintEffect).
Value(machinePoolTaintValue)
}

func createNewMachinePoolLabelsDedicated() (*v1.Label, error) {
label := v1.NewLabel().
Key(machinePoolLabelKey).
Value(machinePoolLabelValue)
l, err := label.Build()
if err != nil {
return nil, err
}
return l, nil
}

func createMachinePoolLabelMap(labels *v1.Label) map[string]string {
labelMap := make(map[string]string)
labelMap[labels.Key()] = labels.Value()
return labelMap
}

func createMachinePoolRequestForDedicated(machinePoolNodeCount int) (*v1.MachinePool, error) {
labels, err := createNewMachinePoolLabelsDedicated()
if err != nil {
return nil, err
}
mp := v1.NewMachinePool()
mp.ID(machinePoolId).
Replicas(machinePoolNodeCount).
InstanceType(machinePoolInstanceType).
Labels(createMachinePoolLabelMap(labels)).
Taints(createNewMachinePoolTaintsDedicated())
machinePool, err := mp.Build()
if err != nil {
return nil, err
}
return machinePool, nil
}

// TODO this function should be moved to an ocm client / provider area
func createMachinePool(opts *options, mprequest *v1.MachinePool) error {
// create a new machine pool via ocm
conn, err := opts.f.Connection()
if err != nil {
return err
}
client, err := conn.API().OCMClustermgmt()
if err != nil {
return err
}
response, err := client.Clusters().Cluster(opts.selectedCluster.ID()).MachinePools().Add().Body(mprequest).Send()
if err != nil {
return err
}
opts.selectedClusterMachinePool = *response.Body()
return nil
}

func createMachinePoolInteractivePrompt(opts *options) error {
validator := &dedicatedcmdutil.Validator{
Localizer: opts.f.Localizer,
Connection: opts.f.Connection,
}
// TODO add page size and Localizer
promptNodeCount := &survey.Input{
Message: "Enter the desired machine pool node count",
Help: "The machine pool node count must be greater than or equal to and a multiple of 3",
}
var nodeCount int
err := survey.AskOne(promptNodeCount, &nodeCount, survey.WithValidator(validator.ValidatorForMachinePoolNodes))
if err != nil {
return err
}
opts.requestedMachinePoolNodeCount = nodeCount
dedicatedMachinePool, err := createMachinePoolRequestForDedicated(nodeCount)
if err != nil {
return err
}
err = createMachinePool(opts, dedicatedMachinePool)
if err != nil {
return err
}
return nil
}

// machine pool replica count must be greater than or equal and a multiple of 3
func validateMachinePoolNodes(opts *options) error {
if len(opts.clusterMachinePoolList.Slice()) > 0 {
for _, machinePool := range opts.clusterMachinePoolList.Slice() {
if validateMachinePoolCount(machinePool.Replicas()) &&
checkForValidMachinePoolLabels(*machinePool) &&
checkForValidMachinePoolTaints(*machinePool) {
opts.selectedClusterMachinePool = *machinePool
return nil
} else {
err := createMachinePoolInteractivePrompt(opts)
if err != nil {
return err
}
}
}
}
return nil
}

func selectAccessPrivateNetworkInteractivePrompt(opts *options) error {
options := []string{"Yes", "No"}
prompt := &survey.Select{
Message: "Do you want to access a private network?",
Options: options,
}
err := survey.AskOne(prompt, &options)
if err != nil {
return err
}
if options[0] == "Yes" {
opts.accessKafkasViaPrivateNetwork = true
} else {
opts.accessKafkasViaPrivateNetwork = false
}

return nil
}

// TODO add machine pool logic
func prepareRequestForKFMEndpoint(opts *options) error {
func registerClusterWithKFM(opts *options) error {
clusterIngressDNSName, err := parseDNSURL(opts)
if err != nil {
return err
}
kfmPayload := kafkamgmtclient.EnterpriseOsdClusterPayload{
ClusterId: opts.selectedCluster.ID(),
ClusterExternalId: opts.selectedCluster.ExternalID(),
ClusterIngressDnsName: clusterIngressDNSName,
KafkaMachinePoolNodeCount: opts.kafkaMachinePoolNodeCount,
AccessKafkasViaPrivateNetwork: opts.accessKafkasViaPrivateNetwork,
ClusterId: opts.selectedCluster.ID(),
ClusterExternalId: opts.selectedCluster.ExternalID(),
ClusterIngressDnsName: clusterIngressDNSName,
KafkaMachinePoolNodeCount: int32(opts.selectedClusterMachinePool.Replicas()),
}
opts.f.Logger.Info("kfmPayload: ", kfmPayload)
// TODO add kfm client and call the endpoint

conn, err := opts.f.Connection()
if err != nil {
return err
}
client := conn.API()
resource := client.KafkaMgmtEnterprise().RegisterEnterpriseOsdCluster(context.Background()).EnterpriseOsdClusterPayload(kfmPayload)
response, r, err := resource.Execute()
if err != nil {
return err
return nil
}
opts.f.Logger.Info("response: ", response)
opts.f.Logger.Info("r: ", r)
return nil
}
1 change: 1 addition & 0 deletions pkg/shared/connection/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type API interface {
ServiceRegistryMgmt() registrymgmtclient.RegistriesApi
ConnectorsMgmt() connectormgmtclient.APIClient
ServiceAccountMgmt() svcacctmgmtclient.ServiceAccountsApi
KafkaMgmtEnterprise() kafkamgmtclient.EnterpriseDataplaneClustersApi
KafkaAdmin(instanceID string) (*kafkainstanceclient.APIClient, *kafkamgmtclient.KafkaRequest, error)
ServiceRegistryInstance(instanceID string) (*registryinstanceclient.APIClient, *registrymgmtclient.Registry, error)
AccountMgmt() amsclient.AppServicesApi
Expand Down
13 changes: 12 additions & 1 deletion pkg/shared/connection/api/defaultapi/default_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ func (a *defaultAPI) KafkaMgmt() kafkamgmtclient.DefaultApi {
return client.DefaultApi
}

func (a *defaultAPI) KafkaMgmtEnterprise() kafkamgmtclient.EnterpriseDataplaneClustersApi {
tc := a.CreateOAuthTransport(a.AccessToken)
client := kafkamgmt.NewAPIClient(&kafkamgmt.Config{
BaseURL: a.ApiURL.String(),
Debug: a.Logger.DebugEnabled(),
HTTPClient: tc,
UserAgent: a.UserAgent,
})

return client.EnterpriseDataplaneClustersApi
}

// ServiceRegistryMgmt return a new Service Registry Management API client instance
func (a *defaultAPI) ServiceRegistryMgmt() registrymgmtclient.RegistriesApi {
tc := a.CreateOAuthTransport(a.AccessToken)
Expand Down Expand Up @@ -302,7 +314,6 @@ func (a *defaultAPI) CreateOCMConnection() (*ocmSdkClient.Connection, error) {

// create an OCM clustermgmt client
func (a *defaultAPI) OCMClustermgmt() (*ocmclustersmgmtv1.Client, error) {
// create an OCM connection
conn, err := a.CreateOCMConnection()
if err != nil {
return nil, err
Expand Down

0 comments on commit 85958fc

Please sign in to comment.