-
Notifications
You must be signed in to change notification settings - Fork 0
/
share.go
93 lines (89 loc) · 2.55 KB
/
share.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func consumer(msgChan chan amqp.Delivery, mq *SenderMq) {
conn, channel := mqConnect(mq.SENDERURL)
defer func() {
_ = channel.Close()
_ = conn.Close()
}()
err := channel.ExchangeDeclare(mq.SENDEREXCHANGE, "topic", true, false, false, false, nil)
fail := failOnErr(err, "exchange declare error")
if fail != nil {
return
}
fail = declareQueue(channel, mq.SENDERQUEUE)
if fail != nil {
return
}
err = channel.QueueBind(mq.SENDERQUEUE, mq.SENDEROUTINGKEY, mq.SENDEREXCHANGE, false, nil)
fail = failOnErr(err, "queue bind error")
if fail != nil {
return
}
for msg := range msgChan {
err = channel.Publish(mq.SENDEREXCHANGE, mq.SENDEROUTINGKEY, false, false,
amqp.Publishing{ContentType:msg.ContentType, ContentEncoding: msg.ContentType,
DeliveryMode: msg.DeliveryMode, Body: msg.Body})
fail = failOnErr(err, "publish msg error")
if fail != nil {
_ = msg.Reject(true)
return
}
log.Info("publish msg ok")
_ = msg.Ack(false)
}
return
}
func declareQueue(channel *amqp.Channel, queueName string) error{
_, err := channel.QueueDeclare(queueName, true, false, false, true, amqp.Table{"x-max-priority": 10})
return failOnErr(err, "Fail on create queue")
}
func do(receiverMq *ReceiverMq, senderMq *SenderMq) error{
//conn, channel := mqConnect(receiverMq.RECEIVERURL)
conn, err :=amqp.Dial(receiverMq.RECEIVERURL)
fail := failOnErr(err, "connect url error")
if fail != nil {
return fail
}
channel , err := conn.Channel()
fail = failOnErr(err, "channel error")
if fail != nil {
return fail
}
log.Info("connect ok")
_ = channel.Qos(3, 0, false)
fail = declareQueue(channel, receiverMq.RECEIVERQUEUE)
if fail != nil {
return fail
}
err = channel.ExchangeDeclare(receiverMq.RECEIVEREXCHANGE, "topic", true, false, false, false, nil)
fail = failOnErr(err, "exchange declare error")
if fail != nil {
return fail
}
err = channel.QueueBind(receiverMq.RECEIVERQUEUE, receiverMq.RECEIVEROUTINGKEY, receiverMq.RECEIVEREXCHANGE, false, nil)
fail = failOnErr(err, "queue bind error")
if fail != nil {
return fail
}
defer func() {
_ = channel.Close()
}()
defer func() {
_ = conn.Close()
}()
msgChan, err := channel.Consume(receiverMq.RECEIVERQUEUE, "GO_transfer_consumer", false, false, false, false, nil)
fail = failOnErr(err, "fail to get msg")
if fail != nil {
return fail
}
consumerChan := make(chan amqp.Delivery, 3)
for i := 0; i < 3; i++ {
go func() {
consumer(consumerChan, senderMq)
}()
}
for msg := range msgChan {
log.Info("get message %s", bytes.NewBuffer(msg.Body))
consumerChan <- msg
}
return nil
}