-
Notifications
You must be signed in to change notification settings - Fork 72
/
topics.go
101 lines (80 loc) · 2.1 KB
/
topics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package topics
import (
"fmt"
"net"
"os"
"strconv"
"strings"
"time"
"github.com/bf2fc6cc711aee1a0c2a/cli/pkg/config"
"github.com/fatih/color"
"github.com/segmentio/kafka-go"
)
func brokerConnect() (broker *kafka.Conn, ctl *kafka.Conn) {
// TODO enable and configure SASL plain
// mechanism := plain.Mechanism{
// Username: "username",
// Password: "password",
// }
dialer := &kafka.Dialer{
Timeout: 100 * time.Second,
DualStack: true,
//SASLMechanism: mechanism,
}
cfg, err := config.Load()
if err != nil {
fmt.Fprint(os.Stderr, err)
}
if cfg.Services.Kafka.ClusterHost == "" {
fmt.Fprint(os.Stderr, "No Kafka selected. Run rhoas kafka use")
panic("Missing config")
}
var clusterURL string
if strings.HasPrefix(cfg.Services.Kafka.ClusterHost, "localhost") {
clusterURL = cfg.Services.Kafka.ClusterHost
} else {
clusterURL = cfg.Services.Kafka.ClusterHost + ":443"
}
conn, err := dialer.Dial("tcp", clusterURL)
if err != nil {
panic(err.Error())
}
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
return conn, controllerConn
}
func CreateKafkaTopic(topicConfigs *[]kafka.TopicConfig) error {
conn, controllerConn := brokerConnect()
defer conn.Close()
defer controllerConn.Close()
return controllerConn.CreateTopics(*topicConfigs...)
}
func DeleteKafkaTopic(topic string) error {
conn, controllerConn := brokerConnect()
defer conn.Close()
defer controllerConn.Close()
return controllerConn.DeleteTopics([]string{topic}...)
}
func ListKafkaTopics() error {
conn, controllerConn := brokerConnect()
defer conn.Close()
defer controllerConn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
return err
}
for i := range partitions {
topicPartition := &partitions[i]
replicas := strconv.Itoa(len(topicPartition.Replicas))
fmt.Fprintf(os.Stderr, "Name: %v (Replicas: %v)\n",
color.HiGreenString(topicPartition.Topic),
color.HiRedString(replicas))
}
return nil
}