forked from mailgun/gubernator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmultiregion.go
83 lines (71 loc) · 2.08 KB
/
multiregion.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package gubernator
import (
"github.com/mailgun/holster/v3/syncutil"
"github.com/sirupsen/logrus"
)
type mutliRegionManager struct {
reqQueue chan *RateLimitReq
wg syncutil.WaitGroup
conf BehaviorConfig
log logrus.FieldLogger
instance *V1Instance
}
func newMultiRegionManager(conf BehaviorConfig, instance *V1Instance) *mutliRegionManager {
mm := mutliRegionManager{
conf: conf,
instance: instance,
log: instance.log,
reqQueue: make(chan *RateLimitReq, conf.MultiRegionBatchLimit),
}
mm.runAsyncReqs()
return &mm
}
// QueueHits writes the RateLimitReq to be asynchronously sent to other regions
func (mm *mutliRegionManager) QueueHits(r *RateLimitReq) {
mm.reqQueue <- r
}
func (mm *mutliRegionManager) runAsyncReqs() {
var interval = NewInterval(mm.conf.MultiRegionSyncWait)
hits := make(map[string]*RateLimitReq)
mm.wg.Until(func(done chan struct{}) bool {
select {
case r := <-mm.reqQueue:
key := r.HashKey()
// Aggregate the hits into a single request
_, ok := hits[key]
if ok {
hits[key].Hits += r.Hits
} else {
hits[key] = r
}
// Send the hits if we reached our batch limit
if len(hits) == mm.conf.MultiRegionBatchLimit {
for dc, picker := range mm.instance.GetRegionPickers() {
mm.log.Debugf("Sending %v hit(s) to %s picker", len(hits), dc)
mm.sendHits(hits, picker)
}
hits = make(map[string]*RateLimitReq)
}
// Queue next interval
if len(hits) == 1 {
interval.Next()
}
case <-interval.C:
if len(hits) > 0 {
for dc, picker := range mm.instance.GetRegionPickers() {
mm.log.Debugf("Sending %v hit(s) to %s picker", len(hits), dc)
mm.sendHits(hits, picker)
}
hits = make(map[string]*RateLimitReq)
}
case <-done:
return false
}
return true
})
}
// TODO: Sending cross DC should mainly update the hits, the config should not be sent, or ignored when received
// TODO: Calculation of OVERLIMIT should not occur when sending hits cross DC
func (mm *mutliRegionManager) sendHits(r map[string]*RateLimitReq, picker PeerPicker) {
// Does nothing for now
}