-
Describe the bugThanks for reading, it could be my problem, but I have been troubleshooting for a long time without finding the cause. Here is my source code, I'm doing a practice on confirm mode and return mode
Reproduction steps
package main
import (
"context"
"github.com/rabbitmq/amqp091-go"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// open confirm mode
err = ch.Confirm(false)
failOnError(err, "Failed to open confirm mode")
// NotifyPublish
confirmNotice := ch.NotifyPublish(make(chan amqp091.Confirmation, 1))
go func() {
notice := <-confirmNotice
log.Printf("【<-】confirm notice: %v", notice)
// ⭐️ for range does not fetch the data in chan, but it clearly has data
for notice := range confirmNotice {
log.Printf("【range】confirm notice: %v", notice)
}
// ⭐️ It can fetch the same acknowledgement countless times
for i := 0; i < 20; i++ {
notice = <-confirmNotice
log.Printf("【<-】confirm notice: %v", notice)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Hello World!"
// Publish a message to a non-existent switch, it should return false to `NotifyPublish` once
err = ch.PublishWithContext(ctx,
"xxxyyy", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
var forever chan struct{}
<-forever
}
// I can't find where to open return mode
// NotifyReturn
returnNotice := ch.NotifyReturn(make(chan amqp091.Return, 1))
go func() {
for notice := range returnNotice {
log.Printf("return notice: %v", notice)
}
}()
// To publish a message to a non-existent queue, it should return false once to `NotifyReturn`.
err = ch.PublishWithContext(ctx,
"", // exchange
"zzz", // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body) Expected behaviorI hope to get a solution to this problem, thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
hi, @lukebakken |
Beta Was this translation helpful? Give feedback.
-
Hi @wushu037, I can confirm that there is no bug, the code is working as intended. I don't know what are the requirements for your practice; I guess you have to demonstrate publish confirm and return (see below). For when you will receive a publish confirmation, I'm going to quote the documentation:
In your source code, the You are able to receive from the channel because Golang's receive operator always returns on a closed channel. The In order to get a Here is your source code modified to demonstrate publish confirm, return and channel exception. issue196.gopackage main
import (
"context"
"github.com/rabbitmq/amqp091-go"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// open confirm mode
err = ch.Confirm(false)
failOnError(err, "Failed to open confirm mode")
// NotifyPublish
confirmNotice := ch.NotifyPublish(make(chan amqp091.Confirmation, 1))
go func() {
// ⭐️ for range does fetch the data in chan :-)
for notice := range confirmNotice {
// publish confirms that the server received the message
// it does NOT mean that the message was routed to a queue
log.Printf("【range】confirm notice: %v", notice)
}
// ⭐️ channel is closed due to a channel exception (publish to non-existing exchange)
// you can receive from the channel, but you will get the zero-value of Confirmation
for i := 0; i < 20; i++ {
notice, ok := <-confirmNotice
log.Printf("【<-】confirm notice (ok: %v): %v", ok, notice)
}
}()
// NotifyReturn
returnNotice := ch.NotifyReturn(make(chan amqp091.Return, 1))
go func() {
for notice := range returnNotice {
log.Printf("【range】return notice: %v", notice)
}
}()
// NotifyClose
channelNotice := ch.NotifyClose(make(chan *amqp091.Error, 1))
go func() {
select {
case notice := <-channelNotice:
log.Printf("【select】channel notice: %v", notice)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Hello World!"
// exchange exists, but message is undeliverable because there is no binding to route the message
// it will notify return because it has mandatory flag
err = ch.PublishWithContext(ctx,
"", // exchange
"this-does-not-exist", // routing key
true, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
// Publish a message to a non-existent switch, it will cause a channel exception,
// close the channel and notify channel close
err = ch.PublishWithContext(ctx,
"xxxyyy", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
var forever chan struct{}
<-forever
} Edit: RabbitMQ does not support |
Beta Was this translation helpful? Give feedback.
-
Hi @Zerpet , Also I found that rabbitmq 3.0 deprecates |
Beta Was this translation helpful? Give feedback.
Hi @wushu037,
I can confirm that there is no bug, the code is working as intended. I don't know what are the requirements for your practice; I guess you have to demonstrate publish confirm and return (see below).
For when you will receive a publish confirmation, I'm going to quote the documentation:
In your source code, the
PublishWithContext
function suceeds (i.e. sends the message …