Skip to content

Commit

Permalink
fix: Initial version of SASL/Plain support for topic creation (#161)
Browse files Browse the repository at this point in the history
Co-authored-by: Enda Phelan <[email protected]>
  • Loading branch information
wtrocki and Enda Phelan authored Dec 11, 2020
1 parent a3468ed commit 6df805a
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 87 deletions.
12 changes: 8 additions & 4 deletions pkg/cmd/kafka/topics/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ func createTopic(cmd *cobra.Command, _ []string) {
ReplicationFactor: int(replicas),
},
}

err := topics.CreateKafkaTopic(&topicConfigs)
err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for topic: %v\n", err)
return
}
err = topics.CreateKafkaTopic(&topicConfigs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating topic: %v\n", topicName)
fmt.Fprintf(os.Stderr, "Error creating topic: %v\n", err)
return
}

fmt.Fprintf(os.Stderr, "Topic %v created\n", topicName)
fmt.Fprintf(os.Stderr, "Topic %v created\n", err)
}
7 changes: 6 additions & 1 deletion pkg/cmd/kafka/topics/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ func NewDeleteTopicCommand() *cobra.Command {

func deleteTopic(cmd *cobra.Command, _ []string) {
fmt.Fprintf(os.Stderr, "Deleting topic %v\n", topicName)
err := topics.DeleteKafkaTopic(topicName)
err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for topic: %v\n", topicName)
return
}
err = topics.DeleteKafkaTopic(topicName)
if err != nil {
fmt.Fprintf(os.Stderr, "Error deleting topic: %v\n", topicName)
return
Expand Down
12 changes: 8 additions & 4 deletions pkg/cmd/kafka/topics/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ func NewListTopicCommand() *cobra.Command {
}

func listTopic(cmd *cobra.Command, _ []string) {
fmt.Fprintln(os.Stderr, "Topics:")

err := topics.ListKafkaTopics()

err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for list: %v\n", err)
return
}
fmt.Fprintln(os.Stderr, "Topics:")
err = topics.ListKafkaTopics()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to perform list operation\n")
fmt.Fprintf(os.Stderr, "Failed to perform list operation: %v\n", err)
}
}
2 changes: 0 additions & 2 deletions pkg/cmd/kafka/topics/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/create"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/delete"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/list"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/update"
)

const (
Expand All @@ -25,7 +24,6 @@ func NewTopicsCommand() *cobra.Command {
cmd.AddCommand(
create.NewCreateTopicCommand(),
list.NewListTopicCommand(),
update.NewUpdateTopicCommand(),
delete.NewDeleteTopicCommand(),
)
return cmd
Expand Down
53 changes: 0 additions & 53 deletions pkg/cmd/kafka/topics/update/update.go

This file was deleted.

7 changes: 7 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ type Config struct {
ClientID string `json:"client_id,omitempty" doc:"OpenID client identifier."`
Insecure bool `json:"insecure,omitempty" doc:"Enables insecure communication with the server. This disables verification of TLS certificates and host names."`
Scopes []string `json:"scopes,omitempty" doc:"OpenID scope. If this option is used it will replace completely the default scopes. Can be repeated multiple times to specify multiple scopes."`
ServiceAuth ServiceAuth `json:"serviceAuth,omitempty"`
}

// ServiceAuth for cli authentication within enabled services
type ServiceAuth struct {
ClientID string `json:"clientID"`
ClientSecret string `json:"clientSecret"`
}

// ServiceConfigMap is a map of configs for the managed application services
Expand Down
87 changes: 64 additions & 23 deletions pkg/sdk/kafka/topics/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,47 @@ import (
"strings"
"time"

"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/api/managedservices"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/config"
"github.com/fatih/color"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)

func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {
// TODO enable and configure SASL plain
// mechanism := plain.Mechanism{
// Username: "username",
// Password: "password",
// }
func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn, err error) {
cfg, err := config.Load()
if err != nil {
return nil, nil, err
}
mechanism := plain.Mechanism{
Username: cfg.ServiceAuth.ClientID,
Password: cfg.ServiceAuth.ClientSecret,
}

dialer := &kafka.Dialer{
Timeout: 100 * time.Second,
DualStack: true,
//SASLMechanism: mechanism,
Timeout: 100 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}

cfg, err := config.Load()
cfg, err = config.Load()
if err != nil {
fmt.Fprint(os.Stderr, err)
return nil, nil, err
}

if cfg.Services.Kafka.ClusterID == "" {
fmt.Fprint(os.Stderr, "No Kafka selected. Run rhoas kafka use")
panic("Missing config")
return nil, nil, fmt.Errorf("No Kafka selected. Run rhoas kafka use")
}

connection, err := cfg.Connection()
if err != nil {
fmt.Fprintf(os.Stderr, "Could not create connection: %v\n", err)
return nil, nil, fmt.Errorf("Could not create connection: %w", err)
}

managedservices := connection.NewMASClient()
kafkaInstance, _, err := managedservices.DefaultApi.GetKafkaById(context.TODO(), cfg.Services.Kafka.ClusterID)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not get Kafka instance: %v\n", err)
return
return nil, nil, fmt.Errorf("Could not get Kafka instance: %w", err)
}

var clusterURL string
Expand All @@ -58,24 +61,56 @@ func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {

conn, err := dialer.Dial("tcp", clusterURL)
if err != nil {
panic(err.Error())
return nil, nil, err
}

controller, err := conn.Controller()
if err != nil {
panic(err.Error())
return nil, nil, err
}

controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
return nil, nil, err
}

return conn, controllerConn, nil
}

func ValidateCredentials() error {
cfg, err := config.Load()
if err != nil {
fmt.Fprint(os.Stderr, err)
}

return conn, controllerConn
if cfg.ServiceAuth.ClientID == "" {
connection, err := cfg.Connection()
if err != nil {
return fmt.Errorf("Can't create connection: %w", err)
}
client := connection.NewMASClient()
fmt.Fprint(os.Stderr, "\nNo Service credentials. \nCreating service account for CLI\n")
svcAcctPayload := &managedservices.ServiceAccountRequest{Name: "RHOAS-CLI", Description: "RHOAS-CLI Service Account"}
response, _, err := client.DefaultApi.CreateServiceAccount(context.Background(), *svcAcctPayload)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return err
}
cfg.ServiceAuth.ClientID = response.ClientID
cfg.ServiceAuth.ClientSecret = response.ClientSecret
if err = config.Save(cfg); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return err
}
}
return nil
}

func CreateKafkaTopic(topicConfigs *[]kafka.TopicConfig) error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand All @@ -84,7 +119,10 @@ func CreateKafkaTopic(topicConfigs *[]kafka.TopicConfig) error {
}

func DeleteKafkaTopic(topic string) error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand All @@ -93,7 +131,10 @@ func DeleteKafkaTopic(topic string) error {
}

func ListKafkaTopics() error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand Down

0 comments on commit 6df805a

Please sign in to comment.