Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

amqp.Channel's method InspectQueue blocks when connection is lost #501

Open
PeterEFinch opened this issue Mar 21, 2021 · 1 comment
Open

Comments

@PeterEFinch
Copy link

PeterEFinch commented Mar 21, 2021

A colleague and I noticed that amqp.Channel's method InspectQueue is sometimes blocking when the connection is lost. I have included a minimal example of the code used below.

I tested this code under two different circumstances:

  1. AMQP_URL had the address of a local RabbitMQ server running in docker (rabbitmq:3-management). Once the connection was established I stopped the docker container. The program stopped as expected (see expected_output.txt).
  2. AMQP_URL had the address of our production RabbitMQ server. Once the connection was established I disconnected the internet. The program never stopped and appeared to be stuck at the method channel.QueueInspect (see
    blocking_output.txt).

Additional information: I ran this code inside GoLand with go version go1.16.2 darwin/amd64 . I removed the preamble but left in the exit message .

Hopefully, this problem should be reproducible.

package main

import (
	"fmt"
	"os"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	const queue = "bug_testing_queue"

	url, ok := os.LookupEnv("AMQP_URL")
	if !ok {
		panic("no environment variable AMQP_URL")
	}

	connection, err := amqp.Dial(url)
	if err != nil {
		panic(err)
	}

	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	_, err = channel.QueueDeclare(queue, true, false, false, false, nil)
	if err != nil {
		panic(err)
	}

	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	for {
		fmt.Println("waiting")
		select {
		case <-ticker.C:
			fmt.Println("pre queue inspect")
			_, err := channel.QueueInspect(queue)
			fmt.Println("post queue inspect")
			if err != nil {
				fmt.Printf("unable to inspect queue: %v\n", err)
				return
			}
		}
	}
}

Finally, adding

notifyCh := channel.NotifyClose(make(chan *amqp.Error))

before the for loop and

case <-notifyCh:
fmt.Println("channel closed unexpectedly")
return

to the select statement doesn't prevent the blocking output.

@PeterEFinch
Copy link
Author

@michaelklishin I noticed you have said that Team RabbitMQ plans on forking this library (#497 (comment)) and that @streadway lacks time (#497 (comment)). Is there anyway we can move such issues to this planned library?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant