Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Initial version of SASL/Plain support for topic creation #161

Merged
merged 4 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/cmd/kafka/topics/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ func createTopic(cmd *cobra.Command, _ []string) {
ReplicationFactor: int(replicas),
},
}

err := topics.CreateKafkaTopic(&topicConfigs)
err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for topic: %v\n", err)
return
}
err = topics.CreateKafkaTopic(&topicConfigs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating topic: %v\n", topicName)
fmt.Fprintf(os.Stderr, "Error creating topic: %v\n", err)
return
}

fmt.Fprintf(os.Stderr, "Topic %v created\n", topicName)
fmt.Fprintf(os.Stderr, "Topic %v created\n", err)
}
7 changes: 6 additions & 1 deletion pkg/cmd/kafka/topics/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ func NewDeleteTopicCommand() *cobra.Command {

func deleteTopic(cmd *cobra.Command, _ []string) {
fmt.Fprintf(os.Stderr, "Deleting topic %v\n", topicName)
err := topics.DeleteKafkaTopic(topicName)
err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for topic: %v\n", topicName)
return
}
err = topics.DeleteKafkaTopic(topicName)
if err != nil {
fmt.Fprintf(os.Stderr, "Error deleting topic: %v\n", topicName)
return
Expand Down
12 changes: 8 additions & 4 deletions pkg/cmd/kafka/topics/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ func NewListTopicCommand() *cobra.Command {
}

func listTopic(cmd *cobra.Command, _ []string) {
fmt.Fprintln(os.Stderr, "Topics:")

err := topics.ListKafkaTopics()

err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for list: %v\n", err)
return
}
fmt.Fprintln(os.Stderr, "Topics:")
err = topics.ListKafkaTopics()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to perform list operation\n")
fmt.Fprintf(os.Stderr, "Failed to perform list operation: %v\n", err)
}
}
2 changes: 0 additions & 2 deletions pkg/cmd/kafka/topics/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/create"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/delete"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/list"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/update"
)

const (
Expand All @@ -25,7 +24,6 @@ func NewTopicsCommand() *cobra.Command {
cmd.AddCommand(
create.NewCreateTopicCommand(),
list.NewListTopicCommand(),
update.NewUpdateTopicCommand(),
delete.NewDeleteTopicCommand(),
)
return cmd
Expand Down
53 changes: 0 additions & 53 deletions pkg/cmd/kafka/topics/update/update.go

This file was deleted.

7 changes: 7 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ type Config struct {
ClientID string `json:"client_id,omitempty" doc:"OpenID client identifier."`
Insecure bool `json:"insecure,omitempty" doc:"Enables insecure communication with the server. This disables verification of TLS certificates and host names."`
Scopes []string `json:"scopes,omitempty" doc:"OpenID scope. If this option is used it will replace completely the default scopes. Can be repeated multiple times to specify multiple scopes."`
ServiceAuth ServiceAuth `json:"serviceAuth,omitempty"`
}

// ServiceAuth for cli authentication within enabled services
type ServiceAuth struct {
ClientID string `json:"clientID"`
ClientSecret string `json:"clientSecret"`
}

// ServiceConfigMap is a map of configs for the managed application services
Expand Down
87 changes: 64 additions & 23 deletions pkg/sdk/kafka/topics/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,47 @@ import (
"strings"
"time"

"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/api/managedservices"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/config"
"github.com/fatih/color"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)

func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {
// TODO enable and configure SASL plain
// mechanism := plain.Mechanism{
// Username: "username",
// Password: "password",
// }
func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn, err error) {
cfg, err := config.Load()
if err != nil {
return nil, nil, err
}
mechanism := plain.Mechanism{
Username: cfg.ServiceAuth.ClientID,
Password: cfg.ServiceAuth.ClientSecret,
}

dialer := &kafka.Dialer{
Timeout: 100 * time.Second,
DualStack: true,
//SASLMechanism: mechanism,
Timeout: 100 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}

cfg, err := config.Load()
cfg, err = config.Load()
if err != nil {
fmt.Fprint(os.Stderr, err)
return nil, nil, err
}

if cfg.Services.Kafka.ClusterID == "" {
fmt.Fprint(os.Stderr, "No Kafka selected. Run rhoas kafka use")
panic("Missing config")
return nil, nil, fmt.Errorf("No Kafka selected. Run rhoas kafka use")
}

connection, err := cfg.Connection()
if err != nil {
fmt.Fprintf(os.Stderr, "Could not create connection: %v\n", err)
return nil, nil, fmt.Errorf("Could not create connection: %w", err)
}

managedservices := connection.NewMASClient()
kafkaInstance, _, err := managedservices.DefaultApi.GetKafkaById(context.TODO(), cfg.Services.Kafka.ClusterID)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not get Kafka instance: %v\n", err)
return
return nil, nil, fmt.Errorf("Could not get Kafka instance: %w", err)
}

var clusterURL string
Expand All @@ -58,24 +61,56 @@ func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {

conn, err := dialer.Dial("tcp", clusterURL)
if err != nil {
panic(err.Error())
return nil, nil, err
}

controller, err := conn.Controller()
if err != nil {
panic(err.Error())
return nil, nil, err
}

controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
return nil, nil, err
}

return conn, controllerConn, nil
}

func ValidateCredentials() error {
cfg, err := config.Load()
if err != nil {
fmt.Fprint(os.Stderr, err)
}

return conn, controllerConn
if cfg.ServiceAuth.ClientID == "" {
connection, err := cfg.Connection()
if err != nil {
return fmt.Errorf("Can't create connection: %w", err)
}
client := connection.NewMASClient()
fmt.Fprint(os.Stderr, "\nNo Service credentials. \nCreating service account for CLI\n")
svcAcctPayload := &managedservices.ServiceAccountRequest{Name: "RHOAS-CLI", Description: "RHOAS-CLI Service Account"}
response, _, err := client.DefaultApi.CreateServiceAccount(context.Background(), *svcAcctPayload)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return err
}
cfg.ServiceAuth.ClientID = response.ClientID
cfg.ServiceAuth.ClientSecret = response.ClientSecret
if err = config.Save(cfg); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return err
}
}
return nil
}

func CreateKafkaTopic(topicConfigs *[]kafka.TopicConfig) error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand All @@ -84,7 +119,10 @@ func CreateKafkaTopic(topicConfigs *[]kafka.TopicConfig) error {
}

func DeleteKafkaTopic(topic string) error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand All @@ -93,7 +131,10 @@ func DeleteKafkaTopic(topic string) error {
}

func ListKafkaTopics() error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand Down