From 5b880c814c39bba6da4cfe8a77326c6231521082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 2 May 2018 14:37:54 +0200 Subject: [PATCH] Implement several HTTP endpoints Namely channels, virtual hosts, users, permissions. [#157001487] References #122 --- .../http/client/HttpClientException.java | 39 ++ .../rabbitmq/http/client/HttpException.java | 27 + .../http/client/ReactorNettyClient.java | 208 ++++++- .../http/client/ReactorNettyClientSpec.groovy | 515 +++++++++++++++++- 4 files changed, 765 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/rabbitmq/http/client/HttpClientException.java create mode 100644 src/main/java/com/rabbitmq/http/client/HttpException.java diff --git a/src/main/java/com/rabbitmq/http/client/HttpClientException.java b/src/main/java/com/rabbitmq/http/client/HttpClientException.java new file mode 100644 index 00000000..d401e89c --- /dev/null +++ b/src/main/java/com/rabbitmq/http/client/HttpClientException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rabbitmq.http.client; + +/** + * + */ +public class HttpClientException extends HttpException { + + public HttpClientException(reactor.ipc.netty.http.client.HttpClientException cause) { + super(cause); + } + + public int status() { + return cause().status().code(); + } + + public String reason() { + return cause().status().reasonPhrase(); + } + + private reactor.ipc.netty.http.client.HttpClientException cause() { + return (reactor.ipc.netty.http.client.HttpClientException) getCause(); + } +} diff --git a/src/main/java/com/rabbitmq/http/client/HttpException.java b/src/main/java/com/rabbitmq/http/client/HttpException.java new file mode 100644 index 00000000..eab34acf --- /dev/null +++ b/src/main/java/com/rabbitmq/http/client/HttpException.java @@ -0,0 +1,27 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rabbitmq.http.client; + +/** + * + */ +public class HttpException extends RuntimeException { + + public HttpException(Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java index 232ef00d..b7292747 100644 --- a/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java +++ b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java @@ -21,10 +21,16 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.http.client.domain.ChannelInfo; import com.rabbitmq.http.client.domain.ConnectionInfo; +import com.rabbitmq.http.client.domain.CurrentUserDetails; +import com.rabbitmq.http.client.domain.ExchangeInfo; import com.rabbitmq.http.client.domain.NodeInfo; import com.rabbitmq.http.client.domain.OverviewResponse; import com.rabbitmq.http.client.domain.PolicyInfo; +import com.rabbitmq.http.client.domain.UserInfo; +import com.rabbitmq.http.client.domain.UserPermissions; +import com.rabbitmq.http.client.domain.VhostInfo; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.json.JsonObjectDecoder; @@ -40,7 +46,9 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.function.UnaryOperator; @@ -52,8 +60,6 @@ public class ReactorNettyClient { private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024; - private static final String ENCODING_CHARSET = "UTF-8"; - private final String rootUrl; private final ObjectMapper objectMapper = new ObjectMapper(); @@ -98,6 +104,16 @@ private static HttpResponse toHttpResponse(HttpClientResponse response) { return new HttpResponse(response.status().code(), response.status().reasonPhrase(), headers); } + private static HttpClientRequest disableChunkTransfer(HttpClientRequest request) { + return request.chunkedTransfer(false); + } + + private static HttpClientRequest disableFailOnError(HttpClientRequest request) { + return request + .failOnClientError(false) + .failOnServerError(false); + } + protected Mono createBasicAuthenticationToken(String username, String password) { return Mono.fromSupplier(() -> { String credentials = username + ":" + password; @@ -117,7 +133,7 @@ public Flux getNodes() { } public Mono getNode(String name) { - return doGetMono(NodeInfo.class, "nodes", name); + return doGetMono(NodeInfo.class, "nodes", enc(name)); } public Flux getConnections() { @@ -125,23 +141,19 @@ public Flux getConnections() { } public Mono getConnection(String name) { - return doGetMono(ConnectionInfo.class, "connections", name); + return doGetMono(ConnectionInfo.class, "connections", enc(name)); } public Mono closeConnection(String name) { - return doDelete("connections", name); + return doDelete("connections", enc(name)); } public Mono closeConnection(String name, String reason) { - return doDelete(request -> request.header("X-Reason", reason), "connections", name); + return doDelete(request -> request.header("X-Reason", reason), "connections", enc(name)); } public Mono declarePolicy(String vhost, String name, PolicyInfo info) { - return doPost(info, "policies", vhost, name); - } - - private HttpClientRequest disableChunkTransfer(HttpClientRequest request) { - return request.chunkedTransfer(false); + return doPut(info, "policies", enc(vhost), enc(name)); } public Flux getPolicies() { @@ -149,30 +161,175 @@ public Flux getPolicies() { } public Mono deletePolicy(String vhost, String name) { - return doDelete("policies", vhost, name); + return doDelete("policies", enc(vhost), enc(name)); + } + + public Flux getChannels() { + return doGetFlux(ChannelInfo.class, "channels"); + } + + public Flux getChannels(String connectionName) { + return doGetFlux(ChannelInfo.class, "connections", enc(connectionName), "channels"); + } + + public Mono getChannel(String name) { + return doGetMono(ChannelInfo.class, "channels", enc(name)); + } + + public Flux getVhosts() { + return doGetFlux(VhostInfo.class, "vhosts"); + } + + public Mono getVhost(String name) { + return doGetMono(VhostInfo.class, "vhosts", enc(name)); + } + + public Mono createVhost(String name) { + return doPut("vhosts", enc(name)); + } + + public Mono deleteVhost(String name) { + return doDelete("vhosts", enc(name)); + } + + public Flux getPermissionsIn(String vhost) { + return doGetFlux(UserPermissions.class, "vhosts", enc(vhost), "permissions"); + } + + public Mono updatePermissions(String vhost, String username, UserPermissions permissions) { + return doPut(permissions, "permissions", enc(vhost), enc(username)); + } + + public Flux getUsers() { + return doGetFlux(UserInfo.class, "users"); + } + + public Mono getUser(String username) { + return doGetMono(UserInfo.class, "users", enc(username)); + } + + public Mono deleteUser(String username) { + return doDelete("users", enc(username)); + } + + public Mono createUser(String username, char[] password, List tags) { + if (username == null) { + throw new IllegalArgumentException("username cannot be null"); + } + if (password == null) { + throw new IllegalArgumentException("password cannot be null or empty. If you need to create a user that " + + "will only authenticate using an x509 certificate, use createUserWithPasswordHash with a blank hash."); + } + Map body = new HashMap(); + body.put("password", new String(password)); + if (tags == null || tags.isEmpty()) { + body.put("tags", ""); + } else { + body.put("tags", String.join(",", tags)); + } + return doPut(body, "users", enc(username)); + } + + public Mono updateUser(String username, char[] password, List tags) { + if (username == null) { + throw new IllegalArgumentException("username cannot be null"); + } + Map body = new HashMap(); + // only update password if provided + if (password != null) { + body.put("password", new String(password)); + } + if (tags == null || tags.isEmpty()) { + body.put("tags", ""); + } else { + body.put("tags", String.join(",", tags)); + } + + return doPut(body, "users", enc(username)); + } + + public Flux getPermissionsOf(String username) { + return doGetFlux(UserPermissions.class, "users", enc(username), "permissions"); + } + + public Mono createUserWithPasswordHash(String username, char[] passwordHash, List tags) { + if (username == null) { + throw new IllegalArgumentException("username cannot be null"); + } + // passwordless authentication is a thing. See + // https://github.com/rabbitmq/hop/issues/94 and https://www.rabbitmq.com/authentication.html. MK. + if (passwordHash == null) { + passwordHash = "".toCharArray(); + } + Map body = new HashMap(); + body.put("password_hash", String.valueOf(passwordHash)); + if (tags == null || tags.isEmpty()) { + body.put("tags", ""); + } else { + body.put("tags", String.join(",", tags)); + } + + return doPut(body, "users", enc(username)); + } + + public Mono whoAmI() { + return doGetMono(CurrentUserDetails.class, "whoami"); + } + + public Flux getPermissions() { + return doGetFlux(UserPermissions.class, "permissions"); + } + + public Mono getPermissions(String vhost, String username) { + return doGetMono(UserPermissions.class, "permissions", enc(vhost), enc(username)); + } + + public Mono clearPermissions(String vhost, String username) { + return doDelete("permissions", enc(vhost), enc(username)); + } + + public Flux getExchanges() { + return doGetFlux(ExchangeInfo.class, "exchanges"); + } + + public Flux getExchanges(String vhost) { + return doGetFlux(ExchangeInfo.class, "exchanges", enc(vhost)); } private Mono doGetMono(Class type, String... pathSegments) { return client.get(uri(pathSegments), request -> Mono.just(request) .transform(this::addAuthorization) - .flatMap(pRequest -> pRequest.send())).transform(decode(type)); + .flatMap(pRequest -> pRequest.send())) + .onErrorMap(this::handleError) + .transform(decode(type)); } private Flux doGetFlux(Class type, String... pathSegments) { return (Flux) doGetMono(Array.newInstance(type, 0).getClass(), pathSegments).flatMapMany(items -> Flux.fromArray((Object[]) items)); } - private Mono doPost(Object body, String... pathSegments) { + private Mono doPut(Object body, String... pathSegments) { return client.put(uri(pathSegments), request -> Mono.just(request) .transform(this::addAuthorization) - .map(this::disableChunkTransfer) + .map(ReactorNettyClient::disableChunkTransfer) + .map(ReactorNettyClient::disableFailOnError) .transform(encode(body))) .map(ReactorNettyClient::toHttpResponse); } + private Mono doPut(String... pathSegments) { + return client.put(uri(pathSegments), request -> Mono.just(request) + .transform(this::addAuthorization) + .map(ReactorNettyClient::disableChunkTransfer) + .map(ReactorNettyClient::disableFailOnError) + .flatMap(request2 -> request2.send())) + .map(ReactorNettyClient::toHttpResponse); + } + private Mono doDelete(UnaryOperator operator, String... pathSegments) { return client.delete(uri(pathSegments), request -> Mono.just(request) .transform(this::addAuthorization) + .map(ReactorNettyClient::disableFailOnError) .map(operator) .flatMap(HttpClientRequest::send) ).map(ReactorNettyClient::toHttpResponse); @@ -189,14 +346,11 @@ private Mono addAuthorization(Mono request } private String uri(String... pathSegments) { - StringBuilder builder = new StringBuilder(); - if (pathSegments != null && pathSegments.length > 0) { - for (String pathSegment : pathSegments) { - builder.append("/"); - builder.append(Utils.encode(pathSegment)); - } - } - return rootUrl + builder.toString(); + return rootUrl + "/" + String.join("/", pathSegments); + } + + private String enc(String pathSegment) { + return Utils.encode(pathSegment); } private Function, Flux> decode(Class type) { @@ -228,4 +382,12 @@ private Function, Publisher> encode(Object request } }); } + + // FIXME make this configurable + private T handleError(T cause) { + if (cause instanceof reactor.ipc.netty.http.client.HttpClientException) { + return (T) new HttpClientException((reactor.ipc.netty.http.client.HttpClientException) cause); + } + return (T) new HttpException(cause); + } } diff --git a/src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy b/src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy index fe60e996..ff0f0232 100644 --- a/src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy +++ b/src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy @@ -16,18 +16,31 @@ package com.rabbitmq.http.client +import com.rabbitmq.client.AuthenticationFailureException +import com.rabbitmq.client.Channel import com.rabbitmq.client.Connection import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.http.client.domain.ChannelInfo import com.rabbitmq.http.client.domain.ConnectionInfo +import com.rabbitmq.http.client.domain.ExchangeInfo import com.rabbitmq.http.client.domain.NodeInfo import com.rabbitmq.http.client.domain.PolicyInfo +import com.rabbitmq.http.client.domain.UserPermissions +import com.rabbitmq.http.client.domain.VhostInfo +import reactor.core.publisher.Mono +import spock.lang.IgnoreIf import spock.lang.Specification import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import java.util.stream.Collectors class ReactorNettyClientSpec extends Specification { + protected static final String DEFAULT_USERNAME = "guest" + + protected static final String DEFAULT_PASSWORD = "guest" + protected ReactorNettyClient client private final ConnectionFactory cf = initializeConnectionFactory() @@ -44,7 +57,9 @@ class ReactorNettyClientSpec extends Specification { } protected static ReactorNettyClient newLocalhostNodeClient() { - new ReactorNettyClient("http://guest:guest@localhost:15672/api") + new ReactorNettyClient( + String.format("http://%s:%s@127.0.0.1:15672/api", DEFAULT_USERNAME, DEFAULT_PASSWORD) + ) } def "GET /api/overview"() { @@ -245,6 +260,434 @@ class ReactorNettyClientSpec extends Specification { } } + def "GET /api/channels"() { + given: "an open RabbitMQ client connection with 1 channel" + final conn = openConnection() + final ch = conn.createChannel() + + when: "client lists channels" + + awaitEventPropagation({ client.getConnections() }) + final chs = awaitEventPropagation({ client.getChannels() }) + final chi = chs.blockFirst() + + then: "the list is returned" + verifyChannelInfo(chi, ch) + + cleanup: + if (conn.isOpen()) { + conn.close() + } + } + + def "GET /api/connections/{name}/channels/"() { + given: "an open RabbitMQ client connection with 1 channel" + final s = UUID.randomUUID().toString() + final conn = openConnection(s) + final ch = conn.createChannel() + + when: "client lists channels on that connection" + def xs = awaitEventPropagation({ client.getConnections() }) + // applying filter as some previous connections can still show up the management API + xs = xs.toStream().collect(Collectors.toList()).findAll({ + it.clientProperties.connectionName.equals(s) + }) + def cn = xs.first().name + + final chs = awaitEventPropagation({ client.getChannels(cn) }) + final chi = chs.blockFirst() + + then: "the list is returned" + verifyChannelInfo(chi, ch) + + cleanup: + if (conn.isOpen()) { + conn.close() + } + } + + def "GET /api/channels/{name}"() { + given: "an open RabbitMQ client connection with 1 channel" + final s = UUID.randomUUID().toString() + final conn = openConnection(s) + final ch = conn.createChannel() + + when: "client retrieves channel info" + + def xs = awaitEventPropagation({ client.getConnections() }) + // applying filter as some previous connections can still show up the management API + xs = xs.toStream().collect(Collectors.toList()).findAll({ + it.clientProperties.connectionName.equals(s) + }) + def cn = xs.first().name + final chs = awaitEventPropagation({ client.getChannels(cn) }).blockFirst() + + final chi = client.getChannel(chs.name).block() + + then: "the info is returned" + verifyChannelInfo(chi, ch) + + cleanup: + if (conn.isOpen()) { + conn.close() + } + } + + def "GET /api/vhosts"() { + when: "client retrieves a list of vhosts" + final vhs = client.getVhosts() + final vhi = vhs.blockFirst() + + then: "the info is returned" + verifyVhost(vhi, client.getOverview().block().getRabbitMQVersion()) + } + + def "GET /api/vhosts/{name}"() { + when: "client retrieves vhost info" + final vhi = client.getVhost("/").block() + + then: "the info is returned" + verifyVhost(vhi, client.getOverview().block().getRabbitMQVersion()) + } + + @IgnoreIf({ os.windows }) + def "PUT /api/vhosts/{name}"(String name) { + when: + "client creates a vhost named $name" + client.createVhost(name).block() + + final vhi = client.getVhost(name).block() + + then: "the vhost is created" + vhi.name == name + + cleanup: + client.deleteVhost(name).block() + + where: + name << [ + "http-created", + "http-created2", + "http_created", + "http created", + "создан по хатэтэпэ", + "creado a través de HTTP", + "通过http", + "HTTP를 통해 생성", + "HTTPを介して作成", + "created over http?", + "created @ http API", + "erstellt über http", + "http पर बनाया", + "ถูกสร้างขึ้นผ่าน HTTP", + "±!@^&#*" + ] + } + + def "DELETE /api/vhosts/{name} when vhost exists"() { + given: "a vhost named hop-test-to-be-deleted" + final s = "hop-test-to-be-deleted" + client.createVhost(s).block() + + when: "the vhost is deleted" + client.deleteVhost(s).block() + client.getVhost(s).block() + + then: "it no longer exists" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + } + + def "DELETE /api/vhosts/{name} when vhost DOES NOT exist"() { + given: "no vhost named hop-test-to-be-deleted" + final s = "hop-test-to-be-deleted" + + when: "the vhost is deleted" + final response = client.deleteVhost(s).block() + + then: "the response is 404" + response.status == 404 + } + + def "GET /api/vhosts/{name}/permissions when vhost exists"() { + when: "permissions for vhost / are listed" + final s = "/" + final xs = client.getPermissionsIn(s) + + then: "they include permissions for the guest user" + UserPermissions x = xs.filter({ perm -> perm.user.equals("guest")}).blockFirst() + x.read == ".*" + } + + def "GET /api/vhosts/{name}/permissions when vhost DOES NOT exist"() { + when: "permissions for vhost trololowut are listed" + final s = "trololowut" + client.getPermissionsIn(s).blockFirst() + + then: "flux throws an exception" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + } + + def "GET /api/users"() { + when: "users are listed" + final xs = client.getUsers() + final version = client.getOverview().block().getRabbitMQVersion() + + then: "a list of users is returned" + final x = xs.filter( {user -> user.name.equals("guest")} ).blockFirst() + x.name == "guest" + x.passwordHash != null + isVersion36orLater(version) ? x.hashingAlgorithm != null : x.hashingAlgorithm == null + x.tags.contains("administrator") + } + + def "GET /api/users/{name} when user exists"() { + when: "user guest if fetched" + final x = client.getUser("guest").block() + final version = client.getOverview().block().getRabbitMQVersion() + + then: "user info returned" + x.name == "guest" + x.passwordHash != null + isVersion36orLater(version) ? x.hashingAlgorithm != null : x.hashingAlgorithm == null + x.tags.contains("administrator") + } + + def "GET /api/users/{name} when user DOES NOT exist"() { + when: "user lolwut if fetched" + client.getUser("lolwut").block() + + then: "mono throws exception" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + } + + def "PUT /api/users/{name} updates user tags"() { + given: "user alt-user" + final u = "alt-user" + client.deleteUser(u).subscribe( { r -> return } , { e -> return}) + client.createUser(u, u.toCharArray(), Arrays.asList("original", "management")).block() + awaitEventPropagation() + + when: "alt-user's tags are updated" + client.updateUser(u, u.toCharArray(), Arrays.asList("management", "updated")).block() + awaitEventPropagation() + + and: "alt-user info is reloaded" + final x = client.getUser(u).block() + + then: "alt-user has new tags" + x.tags.contains("updated") + !x.tags.contains("original") + } + + def "DELETE /api/users/{name}"() { + given: "user alt-user" + final u = "alt-user" + client.deleteUser(u).subscribe( { r -> return } , { e -> return}) + client.createUser(u, u.toCharArray(), Arrays.asList("original", "management")).block() + awaitEventPropagation() + + when: "alt-user is deleted" + client.deleteUser(u).block() + awaitEventPropagation() + + and: "alt-user info is reloaded" + client.getUser(u).block() + + then: "deleted user is gone" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + } + + def "GET /api/users/{name}/permissions when user exists"() { + when: "permissions for user guest are listed" + final s = "guest" + final xs = client.getPermissionsOf(s) + + then: "they include permissions for the / vhost" + UserPermissions x = xs.filter( { perm -> perm.user.equals("guest")}).blockFirst() + x.read == ".*" + } + + def "GET /api/users/{name}/permissions when user DOES NOT exist"() { + when: "permissions for user trololowut are listed" + final s = "trololowut" + client.getPermissionsOf(s).blockFirst() + + then: "mono throws exception" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + } + + def "PUT /api/users/{name} with a blank password hash"() { + given: "user alt-user with a blank password hash" + final u = "alt-user" + // blank password hash means only authentication using alternative + // authentication mechanisms such as x509 certificates is possible. MK. + final h = "" + client.deleteUser(u).subscribe( { r -> return } , { e -> return}) + client.createUserWithPasswordHash(u, h.toCharArray(), Arrays.asList("original", "management")).block() + client.updatePermissions("/", u, new UserPermissions(".*", ".*", ".*")).block() + + when: "alt-user tries to connect with a blank password" + openConnection("alt-user", "alt-user") + + then: "connection is refused" + // it would have a chance of being accepted if the x509 authentication mechanism was used. MK. + thrown AuthenticationFailureException + + cleanup: + client.deleteUser(u).block() + } + + def "GET /api/exchanges"() { + when: "client retrieves the list of exchanges across all vhosts" + final xs = client.getExchanges() + final x = xs.blockFirst() + + then: "the list is returned" + verifyExchangeInfo(x) + } + + def "GET /api/whoami"() { + when: "client retrieves active name authentication details" + final res = client.whoAmI().block() + + then: "the details are returned" + res.name == DEFAULT_USERNAME + res.tags ==~ /administrator/ + } + + def "GET /api/permissions"() { + when: "all permissions are listed" + final s = "guest" + final xs = client.getPermissions() + + then: "they include permissions for user guest in vhost /" + final UserPermissions x = xs + .filter( { perm -> perm.vhost.equals("/") && perm.user.equals(s)}) + .blockFirst() + x.read == ".*" + } + + def "GET /api/permissions/{vhost}/:user when both vhost and user exist"() { + when: "permissions of user guest in vhost / are listed" + final u = "guest" + final v = "/" + final UserPermissions x = client.getPermissions(v, u).block() + + then: "a single permissions object is returned" + x.read == ".*" + } + + def "GET /api/exchanges/{vhost} when vhost exists"() { + when: "client retrieves the list of exchanges in a particular vhost" + final xs = client.getExchanges("/") + + then: "the list is returned" + final x = xs.filter( { e -> e.name == "amq.fanout" } ) + verifyExchangeInfo(x.blockFirst()) + } + + def "GET /api/permissions/{vhost}/:user when vhost DOES NOT exist"() { + when: "permissions of user guest in vhost lolwut are listed" + final u = "guest" + final v = "lolwut" + client.getPermissions(v, u).block() + + then: "mono throws exception" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + } + + def "GET /api/permissions/{vhost}/:user when username DOES NOT exist"() { + when: "permissions of user lolwut in vhost / are listed" + final u = "lolwut" + final v = "/" + client.getPermissions(v, u).block() + + then: "mono throws exception" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + } + + def "PUT /api/permissions/{vhost}/:user when both user and vhost exist"() { + given: "vhost hop-vhost1 exists" + final v = "hop-vhost1" + client.createVhost(v).block() + and: "user hop-user1 exists" + final u = "hop-user1" + client.createUser(u, "test".toCharArray(), Arrays.asList("management", "http", "policymaker")).block() + + when: "permissions of user guest in vhost / are updated" + client.updatePermissions(v, u, new UserPermissions("read", "write", "configure")).block() + + and: "permissions are reloaded" + final UserPermissions x = client.getPermissions(v, u).block() + + then: "a single permissions object is returned" + x.read == "read" + x.write == "write" + x.configure == "configure" + + cleanup: + client.deleteVhost(v).block() + client.deleteUser(u).block() + } + + def "PUT /api/permissions/{vhost}/:user when vhost DOES NOT exist"() { + given: "vhost hop-vhost1 DOES NOT exist" + final v = "hop-vhost1" + client.deleteVhost(v).block() + and: "user hop-user1 exists" + final u = "hop-user1" + client.createUser(u, "test".toCharArray(), Arrays.asList("management", "http", "policymaker")).block() + + when: "permissions of user guest in vhost / are updated" + // throws an exception for RabbitMQ 3.7.4+ + // because of the way Cowboy 2.2.2 handles chunked transfer-encoding + // so we handle both 404 and the error + def status = client.updatePermissions(v, u, new UserPermissions("read", "write", "configure")) + .flatMap({ r -> Mono.just(r.status) }) + .onErrorReturn({ t -> "Connection closed prematurely".equals(t.getMessage())}, 500) + .block() + + then: "HTTP status is 400 BAD REQUEST or exception is thrown" + status == 400 || status == 500 + + cleanup: + client.deleteUser(u).block() + } + + def "DELETE /api/permissions/{vhost}/:user when both vhost and username exist"() { + given: "vhost hop-vhost1 exists" + final v = "hop-vhost1" + client.createVhost(v).block() + and: "user hop-user1 exists" + final u = "hop-user1" + client.createUser(u, "test".toCharArray(), Arrays.asList("management", "http", "policymaker")).block() + + and: "permissions of user guest in vhost / are set" + client.updatePermissions(v, u, new UserPermissions("read", "write", "configure")).block() + final UserPermissions x = client.getPermissions(v, u).block() + x.read == "read" + + when: "permissions are cleared" + client.clearPermissions(v, u).block() + + client.getPermissions(v, u).block() + + then: "an exception is thrown on reload" + def exception = thrown(HttpClientException.class) + exception.status() == 404 + + cleanup: + client.deleteVhost(v).block() + client.deleteUser(u).block() + } + protected Connection openConnection() { this.cf.newConnection() } @@ -253,6 +696,13 @@ class ReactorNettyClientSpec extends Specification { this.cf.newConnection(clientProvidedName) } + protected Connection openConnection(String username, String password) { + final cf = new ConnectionFactory() + cf.setUsername(username) + cf.setPassword(password) + cf.newConnection() + } + protected static void verifyNode(NodeInfo node) { assert node != null assert node.name != null @@ -262,6 +712,13 @@ class ReactorNettyClientSpec extends Specification { assert node.memoryUsed <= node.memoryLimit } + protected static void verifyExchangeInfo(ExchangeInfo x) { + assert x.type != null + assert x.durable != null + assert x.name != null + assert x.autoDelete != null + } + protected static void verifyPolicyInfo(PolicyInfo x) { assert x.name != null assert x.vhost != null @@ -276,6 +733,22 @@ class ReactorNettyClientSpec extends Specification { assert info.peerHost.equals(info.host) } + protected static void verifyChannelInfo(ChannelInfo chi, Channel ch) { + assert chi.getConsumerCount() == 0 + assert chi.number == ch.getChannelNumber() + assert chi.node.startsWith("rabbit@") + assert chi.state == "running" || + chi.state == null // HTTP API may not be refreshed yet + assert !chi.usesPublisherConfirms() + assert !chi.transactional + } + + protected static void verifyVhost(VhostInfo vhi, String version) { + assert vhi.name == "/" + assert !vhi.tracing + assert isVersion37orLater(version) ? vhi.clusterState != null : vhi.clusterState == null + } + /** * Statistics tables in the server are updated asynchronously, * in particular starting with rabbitmq/rabbitmq-management#236, @@ -304,4 +777,44 @@ class ReactorNettyClientSpec extends Specification { latch.await(10, TimeUnit.SECONDS) } + static boolean isVersion36orLater(String currentVersion) { + String v = currentVersion.replaceAll("\\+.*\$", ""); + v == "0.0.0" ? true : compareVersions(v, "3.6.0") >= 0 + } + + static boolean isVersion37orLater(String currentVersion) { + String v = currentVersion.replaceAll("\\+.*\$", ""); + v == "0.0.0" ? true : compareVersions(v, "3.7.0") >= 0 + } + + /** + * http://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java + * + */ + static Integer compareVersions(String str1, String str2) { + String[] vals1 = str1.split("\\.") + String[] vals2 = str2.split("\\.") + int i = 0 + // set index to first non-equal ordinal or length of shortest version string + while (i < vals1.length && i < vals2.length && vals1[i] == vals2[i]) { + i++ + } + // compare first non-equal ordinal number + if (i < vals1.length && i < vals2.length) { + if(vals1[i].indexOf('-') != -1) { + vals1[i] = vals1[i].substring(0,vals1[i].indexOf('-')) + } + if(vals2[i].indexOf('-') != -1) { + vals2[i] = vals2[i].substring(0,vals2[i].indexOf('-')) + } + int diff = Integer.valueOf(vals1[i]) <=> Integer.valueOf(vals2[i]) + return Integer.signum(diff) + } + // the strings are equal or one string is a substring of the other + // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" + else { + return Integer.signum(vals1.length - vals2.length) + } + } + }