forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunctional_test.go
148 lines (125 loc) · 3.42 KB
/
functional_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package sarama
import (
"log"
"math/rand"
"net"
"os"
"strconv"
"strings"
"testing"
"time"
toxiproxy "github.com/Shopify/toxiproxy/client"
)
const (
VagrantToxiproxy = "http://192.168.100.67:8474"
VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
)
var (
kafkaAvailable, kafkaRequired bool
kafkaBrokers []string
proxyClient *toxiproxy.Client
Proxies map[string]*toxiproxy.Proxy
ZKProxies = []string{"zk1", "zk2", "zk3", "zk4", "zk5"}
KafkaProxies = []string{"kafka1", "kafka2", "kafka3", "kafka4", "kafka5"}
)
func init() {
if os.Getenv("DEBUG") == "true" {
Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
seed := time.Now().UTC().UnixNano()
if tmp := os.Getenv("TEST_SEED"); tmp != "" {
seed, _ = strconv.ParseInt(tmp, 0, 64)
}
Logger.Println("Using random seed:", seed)
rand.Seed(seed)
proxyAddr := os.Getenv("TOXIPROXY_ADDR")
if proxyAddr == "" {
proxyAddr = VagrantToxiproxy
}
proxyClient = toxiproxy.NewClient(proxyAddr)
kafkaPeers := os.Getenv("KAFKA_PEERS")
if kafkaPeers == "" {
kafkaPeers = VagrantKafkaPeers
}
kafkaBrokers = strings.Split(kafkaPeers, ",")
if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
if err = c.Close(); err == nil {
kafkaAvailable = true
}
}
kafkaRequired = os.Getenv("CI") != ""
}
func checkKafkaAvailability(t testing.TB) {
if !kafkaAvailable {
if kafkaRequired {
t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
} else {
t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
}
}
}
func checkKafkaVersion(t testing.TB, requiredVersion string) {
kafkaVersion := os.Getenv("KAFKA_VERSION")
if kafkaVersion == "" {
t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
} else {
available := parseKafkaVersion(kafkaVersion)
required := parseKafkaVersion(requiredVersion)
if !available.satisfies(required) {
t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
}
}
}
func resetProxies(t testing.TB) {
if err := proxyClient.ResetState(); err != nil {
t.Error(err)
}
Proxies = nil
}
func fetchProxies(t testing.TB) {
var err error
Proxies, err = proxyClient.Proxies()
if err != nil {
t.Fatal(err)
}
}
func SaveProxy(t *testing.T, px string) {
if err := Proxies[px].Save(); err != nil {
t.Fatal(err)
}
}
func setupFunctionalTest(t testing.TB) {
checkKafkaAvailability(t)
resetProxies(t)
fetchProxies(t)
}
func teardownFunctionalTest(t testing.TB) {
resetProxies(t)
}
type kafkaVersion []int
func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
var ov int
for index, v := range kv {
if len(other) <= index {
ov = 0
} else {
ov = other[index]
}
if v < ov {
return false
} else if v > ov {
return true
}
}
return true
}
func parseKafkaVersion(version string) kafkaVersion {
numbers := strings.Split(version, ".")
result := make(kafkaVersion, 0, len(numbers))
for _, number := range numbers {
nr, _ := strconv.Atoi(number)
result = append(result, nr)
}
return result
}