Skip to content

Commit

Permalink
Add test to check policy definition in queue info
Browse files Browse the repository at this point in the history
References #460
  • Loading branch information
acogoluegnes committed Jun 17, 2024
1 parent 16da9b3 commit 88bdee8
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ c.getBindingsByDestination("/", "an.exchange");
Start the broker:

```sh
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-management
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
```

Configure the broker for the test suite:
Expand Down
35 changes: 31 additions & 4 deletions src/test/groovy/com/rabbitmq/http/client/ClientSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference

import static com.rabbitmq.http.client.domain.DestinationType.EXCHANGE
import static com.rabbitmq.http.client.domain.DestinationType.QUEUE
import static java.util.Collections.singletonMap

class ClientSpec extends Specification {

Expand Down Expand Up @@ -597,7 +598,7 @@ class ClientSpec extends Specification {
properties.put("delivery_mode", 1)
properties.put("content_type", "text/plain")
properties.put("priority", 5)
properties.put("headers", Collections.singletonMap("header1", "value1"))
properties.put("headers", singletonMap("header1", "value1"))
def routed = client.publish(v, "amq.direct", q,
new OutboundMessage().payload("Hello world!").utf8Encoded().properties(properties))

Expand Down Expand Up @@ -685,7 +686,33 @@ class ClientSpec extends Specification {

}


def "GET /api/queues/{vhost}/{name} with policy definition"() {
given: "a policy applies to all queues"
def v = "/"
def s = "hop.test"
def pd = singletonMap("expires", 30000)
def pi = new PolicyInfo(".*", 1, "queues", pd)
client.declarePolicy(v, s, pi)

when: "a queue is created"
Connection conn = cf.newConnection()
Channel ch = conn.createChannel()
String q = ch.queueDeclare().queue

then: "the policy definition should be listed in the queue info"
waitAtMostUntilTrue(10, {
def qi = client.getQueue(v, q)
return qi != null && qi.effectivePolicyDefinition != null
})
def queueInfo = client.getQueue(v, q)
queueInfo.effectivePolicyDefinition == pd

cleanup:
ch.queueDelete(q)
conn.close()
client.deletePolicy(v, s)
}

def "GET /api/queues with details"() {
given: "at least one queue was declared and some messages published"
Connection conn = cf.newConnection()
Expand Down Expand Up @@ -1481,7 +1508,7 @@ class ClientSpec extends Specification {
def messageCount = 5
def properties = new AMQP.BasicProperties.Builder()
.contentType("text/plain").deliveryMode(1).priority(5)
.headers(Collections.singletonMap("header1", "value1"))
.headers(singletonMap("header1", "value1"))
.build()
(1..messageCount).each { it ->
ch.basicPublish("", q, properties, "payload${it}".getBytes(Charset.forName("UTF-8")))
Expand Down Expand Up @@ -2743,7 +2770,7 @@ class ClientSpec extends Specification {
p.setApplyTo("exchanges")
p.setName(policyName)
p.setPattern("amq\\.topic")
p.setDefinition(Collections.singletonMap("federation-upstream-set", upstreamSetName))
p.setDefinition(singletonMap("federation-upstream-set", upstreamSetName))
client.declarePolicy(vhost, policyName, p)

when: "client requests the upstream set list"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import java.util.stream.Collectors

import static com.rabbitmq.http.client.domain.DestinationType.EXCHANGE
import static com.rabbitmq.http.client.domain.DestinationType.QUEUE
import static java.util.Collections.singletonMap

class ReactorNettyClientSpec extends Specification {

Expand Down Expand Up @@ -1246,6 +1247,33 @@ class ReactorNettyClientSpec extends Specification {
conn.close()
}

def "GET /api/queues/{vhost}/{name} with policy definition"() {
given: "a policy applies to all queues"
def v = "/"
def s = "hop.test"
def pd = singletonMap("expires", 30000)
def pi = new PolicyInfo(".*", 1, "queues", pd)
client.declarePolicy(v, s, pi).block()

when: "a queue is created"
Connection conn = cf.newConnection()
Channel ch = conn.createChannel()
String q = ch.queueDeclare().queue

then: "the policy definition should be listed in the queue info"
waitAtMostUntilTrue(10, {
def qi = client.getQueue(v, q).block()
return qi != null && qi.effectivePolicyDefinition != null
})
def queueInfo = client.getQueue(v, q).block()
queueInfo.effectivePolicyDefinition == pd

cleanup:
ch.queueDelete(q)
conn.close()
client.deletePolicy(v, s).block()
}

def "GET /api/queues with details"() {
given: "at least one queue was declared and some messages published"
Connection conn = cf.newConnection()
Expand Down

0 comments on commit 88bdee8

Please sign in to comment.