Skip to content

Commit

Permalink
Add missing tests for Reactor Netty client
Browse files Browse the repository at this point in the history
Exchanges & bindings.

[#157001487]

References #122
  • Loading branch information
acogoluegnes committed May 15, 2018
1 parent a1031ef commit f31b4f2
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ public Flux<ExchangeInfo> getExchanges(String vhost) {
return doGetFlux(ExchangeInfo.class, "exchanges", enc(vhost));
}

public Mono<ExchangeInfo> getExchange(String vhost, String name) {
return doGetMono(ExchangeInfo.class, "exchanges", enc(vhost), enc(name));
}

public Mono<HttpResponse> declareExchange(String vhost, String name, ExchangeInfo info) {
return doPut(info, "exchanges", enc(vhost), enc(name));
}
Expand Down
102 changes: 102 additions & 0 deletions src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,108 @@ class ReactorNettyClientSpec extends Specification {
client.deleteShovel("/","shovel1").block()
}

def "DELETE /api/exchanges/{vhost}/{name}"() {
given: "fanout exchange hop.test in vhost /"
final v = "/"
final s = "hop.test"
client.declareExchange(v, s, new ExchangeInfo("fanout", false, false)).block()

final xs = client.getExchanges(v)
final x = xs.filter( { e -> e.name == s } )
verifyExchangeInfo(x.blockFirst())

when: "client deletes exchange hop.test in vhost /"
client.deleteExchange(v, s).block()

and: "exchange list in / is reloaded"
xs = client.getExchanges(v)

then: "hop.test no longer exists"
!xs.filter( { e -> e.name == s } ).hasElements().block()
}

def "GET /api/exchanges/{vhost} when vhost DOES NOT exist"() {
given: "vhost lolwut does not exist"
final v = "lolwut"
client.deleteVhost(v).block()

when: "client retrieves the list of exchanges in that vhost"
client.getExchanges(v).blockFirst()

then: "exception is thrown"
def exception = thrown(HttpClientException.class)
exception.status() == 404
}

def "GET /api/exchanges/{vhost}/{name} when both vhost and exchange exist"() {
when: "client retrieves exchange amq.fanout in vhost /"
final xs = client.getExchange("/", "amq.fanout")

then: "exchange info is returned"
final x = xs.filter( { e -> e.name == "amq.fanout" && e.vhost == "/" })
verifyExchangeInfo(x.block())
}

def "GET /api/exchanges/{vhost}/{name}/bindings/destination"() {
given: "an exchange named hop.exchange1 which is bound to amq.fanout"
final conn = openConnection()
final ch = conn.createChannel()
final src = "amq.fanout"
final dest = "hop.exchange1"
ch.exchangeDeclare(dest, "fanout")
ch.exchangeBind(dest, src, "")

when: "client lists bindings of amq.fanout"
final xs = client.getExchangeBindingsByDestination("/", dest)

then: "there is a binding for hop.exchange1"
final x = xs.filter( { b -> b.source == src &&
b.destinationType == "exchange" &&
b.destination == dest
} )
x.hasElements().block()

cleanup:
ch.exchangeDelete(dest)
conn.close()
}

def "GET /api/exchanges/{vhost}/{name}/bindings/source"() {
given: "a queue named hop.queue1"
final conn = openConnection()
final ch = conn.createChannel()
final q = "hop.queue1"
ch.queueDeclare(q, false, false, false, null)

when: "client lists bindings of default exchange"
final xs = client.getExchangeBindingsBySource("/", "")

then: "there is an automatic binding for hop.queue1"
final x = xs.filter( { b -> b.source == "" && b.destinationType == "queue" && b.destination == q } )
x.hasElements().block()

cleanup:
ch.queueDelete(q)
conn.close()
}

def "PUT /api/exchanges/{vhost}/{name} when vhost exists"() {
given: "fanout exchange hop.test in vhost /"
final v = "/"
final s = "hop.test"
client.declareExchange(v, s, new ExchangeInfo("fanout", false, false)).block()

when: "client lists exchanges in vhost /"
final xs = client.getExchanges(v)

then: "hop.test is listed"
final x = xs.filter( { e -> e.name == s } )
verifyExchangeInfo(x.blockFirst())

cleanup:
client.deleteExchange(v, s).block()
}

protected Connection openConnection() {
this.cf.newConnection()
}
Expand Down

0 comments on commit f31b4f2

Please sign in to comment.