Skip to content

Commit

Permalink
refactor: better error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Enda Phelan committed Dec 11, 2020
1 parent b74451f commit eae4683
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 24 deletions.
6 changes: 3 additions & 3 deletions pkg/cmd/kafka/topics/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func createTopic(cmd *cobra.Command, _ []string) {
}
err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for topic: %v\n", topicName)
fmt.Fprintf(os.Stderr, "Error creating credentials for topic: %v\n", err)
return
}
err = topics.CreateKafkaTopic(&topicConfigs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating topic: %v\n", topicName)
fmt.Fprintf(os.Stderr, "Error creating topic: %v\n", err)
return
}

fmt.Fprintf(os.Stderr, "Topic %v created\n", topicName)
fmt.Fprintf(os.Stderr, "Topic %v created\n", err)
}
2 changes: 1 addition & 1 deletion pkg/cmd/kafka/topics/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func deleteTopic(cmd *cobra.Command, _ []string) {
fmt.Fprintf(os.Stderr, "Error creating credentials for topic: %v\n", topicName)
return
}
err := topics.DeleteKafkaTopic(topicName)
err = topics.DeleteKafkaTopic(topicName)
if err != nil {
fmt.Fprintf(os.Stderr, "Error deleting topic: %v\n", topicName)
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/kafka/topics/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func listTopic(cmd *cobra.Command, _ []string) {

err := topics.ValidateCredentials()
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating credentials for list")
fmt.Fprintf(os.Stderr, "Error creating credentials for list: %v\n", err)
return
}
err = topics.ListKafkaTopics()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to perform list operation\n")
fmt.Fprintf(os.Stderr, "Failed to perform list operation: %v\n", err)
}
}
2 changes: 0 additions & 2 deletions pkg/cmd/kafka/topics/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/create"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/delete"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/list"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/cmd/kafka/topics/update"
)

const (
Expand All @@ -25,7 +24,6 @@ func NewTopicsCommand() *cobra.Command {
cmd.AddCommand(
create.NewCreateTopicCommand(),
list.NewListTopicCommand(),
update.NewUpdateTopicCommand(),
delete.NewDeleteTopicCommand(),
)
return cmd
Expand Down
39 changes: 23 additions & 16 deletions pkg/sdk/kafka/topics/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/segmentio/kafka-go/sasl/plain"
)

func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {
func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn, err error) {
cfg, err := config.Load()
if err != nil {
fmt.Fprint(os.Stderr, err)
return nil, nil, err
}
mechanism := plain.Mechanism{
Username: cfg.ServiceAuth.ClientID,
Expand All @@ -32,26 +32,24 @@ func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {
SASLMechanism: mechanism,
}

cfg, err := config.Load()
cfg, err = config.Load()
if err != nil {
fmt.Fprint(os.Stderr, err)
return nil, nil, err
}

if cfg.Services.Kafka.ClusterID == "" {
fmt.Fprint(os.Stderr, "No Kafka selected. Run rhoas kafka use")
panic("Missing config")
return nil, nil, fmt.Errorf("No Kafka selected. Run rhoas kafka use")
}

connection, err := cfg.Connection()
if err != nil {
fmt.Fprintf(os.Stderr, "Could not create connection: %v\n", err)
return nil, nil, fmt.Errorf("Could not create connection: %w", err)
}

managedservices := connection.NewMASClient()
kafkaInstance, _, err := managedservices.DefaultApi.GetKafkaById(context.TODO(), cfg.Services.Kafka.ClusterID)
if err != nil {
fmt.Fprintf(os.Stderr, "Could not get Kafka instance: %v\n", err)
return
return nil, nil, fmt.Errorf("Could not get Kafka instance: %w", err)
}

var clusterURL string
Expand All @@ -63,20 +61,20 @@ func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {

conn, err := dialer.Dial("tcp", clusterURL)
if err != nil {
panic(err.Error())
return nil, nil, err
}

controller, err := conn.Controller()
if err != nil {
panic(err.Error())
return nil, nil, err
}

controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
return nil, nil, err
}

return conn, controllerConn
return conn, controllerConn, nil
}

func ValidateCredentials() error {
Expand Down Expand Up @@ -109,7 +107,10 @@ func ValidateCredentials() error {
}

func CreateKafkaTopic(topicConfigs *[]kafka.TopicConfig) error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand All @@ -118,7 +119,10 @@ func CreateKafkaTopic(topicConfigs *[]kafka.TopicConfig) error {
}

func DeleteKafkaTopic(topic string) error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand All @@ -127,7 +131,10 @@ func DeleteKafkaTopic(topic string) error {
}

func ListKafkaTopics() error {
conn, controllerConn := brokerConnect()
conn, controllerConn, err := brokerConnect()
if err != nil {
return err
}

defer conn.Close()
defer controllerConn.Close()
Expand Down

0 comments on commit eae4683

Please sign in to comment.