-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessageQ.go
120 lines (108 loc) · 2.9 KB
/
messageQ.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"encoding/json"
"errors"
"time"
"github.com/streadway/amqp"
)
type Breaker struct {
Status string // Breaker Status
FailCount int // Failed operations' count
LastFail time.Time // Succeeded operations' count
FailThreshold int // Failed operations threshold
SuccessThreshold time.Duration // Time duration in which all operations must be succeeded after that FailCount will reset and Status will change to 'Closed'
OpenThreshold time.Duration // Time duration after which Status will change to 'HalfOpen'
Operation func(i interface{}) error
}
type FileManager struct {
AddMessageFunction func(message interface{}) error
}
func (f FileManager) MessageAdd(message interface{}) error {
result := f.AddMessageFunction(message)
return result
}
func MessageSend(message interface{}) error {
connectRabbitMQ, err := amqp.Dial(QConnectionString)
if err != nil {
return err
}
defer connectRabbitMQ.Close()
channelRabbitMQ, err := connectRabbitMQ.Channel()
if err != nil {
return err
}
defer channelRabbitMQ.Close()
bytes, err := json.Marshal(message)
if err != nil {
return err
}
Message := amqp.Publishing{
ContentType: "application/json",
Body: bytes,
}
err = channelRabbitMQ.Publish("", QName, false, false, Message)
if err != nil {
return err
}
return nil
}
func GetFMDefaultInstance() FileManager {
FM := FileManager{AddMessageFunction: MessageSend}
return FM
}
func GetFMOverLoadInstace(function func(message interface{}) error) FileManager {
fm := FileManager{AddMessageFunction: function}
return fm
}
func GetBreakerInstance(function func(iv interface{}) error) Breaker {
br := Breaker{
Status: "Closed",
OpenThreshold: 30 * time.Second,
FailCount: 0,
FailThreshold: 3,
LastFail: time.Now(),
SuccessThreshold: 1 * time.Minute,
Operation: function,
}
return br
}
func (b *Breaker) Open() {
b.Status = "Open"
go func() {
time.Sleep(b.OpenThreshold)
b.Status = "HalfOpen"
}()
}
func (b *Breaker) Do(iv interface{}) error {
// IF Connection is OK and Fail threshold is exceeded mark connection as fail for a time for a fast fail
if b.Status == "Closed" && b.FailCount >= b.FailThreshold {
b.Open()
return errors.New("fail treshold exceeded")
}
// IF connection marked as fail, return immediate error
if b.Status == "Open" {
return errors.New("fail treshold exceeded")
}
// DO operation and check result
err := b.Operation(iv)
if err != nil {
if b.Status == "HalfOpen" {
b.Open()
} else {
b.FailCount++
}
b.LastFail = time.Now()
return err
}
// IF connection is marked as Healthy or halfHealthy check for Last Failed time
if b.Status == "HalfOpen" || b.Status == "Closed" {
if time.Since(b.LastFail) >= b.SuccessThreshold {
b.Close()
}
}
return nil
}
func (b *Breaker) Close() {
b.Status = "Closed"
b.FailCount = 0
}