forked from DataDog/kafka-kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcapacities_test.go
126 lines (97 loc) · 2.65 KB
/
capacities_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"testing"
"github.com/DataDog/kafka-kit/v4/kafkazk"
)
func TestBrokerReplicationCapacities(t *testing.T) {
zk := &kafkazk.Stub{}
reassignments := zk.GetReassignments()
reassigningBrokers, _ := getReassigningBrokers(reassignments, zk)
lim, _ := NewLimits(NewLimitsConfig{
Minimum: 20,
SourceMaximum: 90,
DestinationMaximum: 80,
CapacityMap: map[string]float64{"stub": 200.00},
})
rtc := &ThrottleManager{
reassignments: reassignments,
previouslySetThrottles: replicationCapacityByBroker{1000: throttleByRole{float64ptr(20)}},
limits: lim,
}
bm := stubBrokerMetrics()
brc, _ := brokerReplicationCapacities(rtc, reassigningBrokers, bm)
expected := map[int][2]*float64{
1000: {float64ptr(126.00), nil},
1002: {float64ptr(108.00), nil},
1003: {nil, float64ptr(96.00)},
1005: {nil, float64ptr(20.00)},
1010: {nil, float64ptr(64.00)},
}
for id, got := range brc {
for i := range got {
role := roleFromIndex(i)
if got[i] == nil {
if expected[id][i] != nil {
t.Errorf("Expected rate %.2f, got nil for ID %d role %s", *expected[id][i], id, role)
}
continue
}
if *got[i] != *expected[id][i] {
t.Errorf("Expected rate %.2f, got %.2f for ID %d role %s",
*expected[id][i], *got[i], id, role)
}
}
}
}
func float64ptr(f float64) *float64 {
return &f
}
func TestStoreLeaderCapacity(t *testing.T) {
capacities := replicationCapacityByBroker{}
capacities.storeLeaderCapacity(1001, 100)
out := capacities[1001]
// Index 0 is the leader position.
val := out[0]
if val == nil {
t.Fatal("Unexpected nil value")
}
if *val != 100 {
t.Errorf("Expected value '100', got '%f'", *val)
}
// Index 1 is the follower position.
val = out[1]
if val != nil {
t.Errorf("Expected nil value, got %v", *val)
}
}
func TestStoreFollowerCapacity(t *testing.T) {
capacities := replicationCapacityByBroker{}
capacities.storeFollowerCapacity(1001, 100)
out := capacities[1001]
// Index 1 is the follower position.
val := out[1]
if val == nil {
t.Fatal("Unexpected nil value")
}
if *val != 100 {
t.Errorf("Expected value '100', got '%f'", *val)
}
// Index 0 is the leader position.
val = out[0]
if val != nil {
t.Errorf("Expected nil value, got %v", *val)
}
}
func TestReset(t *testing.T) {
capacities := replicationCapacityByBroker{}
capacities.setAllRatesWithDefault([]int{1001, 1002, 1003}, 100)
// Check expected len.
if len(capacities) != 3 {
t.Errorf("Expected len 3, got %d", len(capacities))
}
// Reset, check len.
capacities.reset()
if len(capacities) != 0 {
t.Errorf("Expected len 0, got %d", len(capacities))
}
}