Skip to content

Commit

Permalink
Implement several HTTP endpoints
Browse files Browse the repository at this point in the history
Namely channels, virtual hosts, users, permissions.

[#157001487]

References #122
  • Loading branch information
acogoluegnes committed May 2, 2018
1 parent 885a409 commit 5b880c8
Show file tree
Hide file tree
Showing 4 changed files with 765 additions and 24 deletions.
39 changes: 39 additions & 0 deletions src/main/java/com/rabbitmq/http/client/HttpClientException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rabbitmq.http.client;

/**
*
*/
public class HttpClientException extends HttpException {

public HttpClientException(reactor.ipc.netty.http.client.HttpClientException cause) {
super(cause);
}

public int status() {
return cause().status().code();
}

public String reason() {
return cause().status().reasonPhrase();
}

private reactor.ipc.netty.http.client.HttpClientException cause() {
return (reactor.ipc.netty.http.client.HttpClientException) getCause();
}
}
27 changes: 27 additions & 0 deletions src/main/java/com/rabbitmq/http/client/HttpException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rabbitmq.http.client;

/**
*
*/
public class HttpException extends RuntimeException {

public HttpException(Throwable cause) {
super(cause);
}
}
208 changes: 185 additions & 23 deletions src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.http.client.domain.ChannelInfo;
import com.rabbitmq.http.client.domain.ConnectionInfo;
import com.rabbitmq.http.client.domain.CurrentUserDetails;
import com.rabbitmq.http.client.domain.ExchangeInfo;
import com.rabbitmq.http.client.domain.NodeInfo;
import com.rabbitmq.http.client.domain.OverviewResponse;
import com.rabbitmq.http.client.domain.PolicyInfo;
import com.rabbitmq.http.client.domain.UserInfo;
import com.rabbitmq.http.client.domain.UserPermissions;
import com.rabbitmq.http.client.domain.VhostInfo;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.json.JsonObjectDecoder;
Expand All @@ -40,7 +46,9 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.UnaryOperator;
Expand All @@ -52,8 +60,6 @@ public class ReactorNettyClient {

private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024;

private static final String ENCODING_CHARSET = "UTF-8";

private final String rootUrl;

private final ObjectMapper objectMapper = new ObjectMapper();
Expand Down Expand Up @@ -98,6 +104,16 @@ private static HttpResponse toHttpResponse(HttpClientResponse response) {
return new HttpResponse(response.status().code(), response.status().reasonPhrase(), headers);
}

private static HttpClientRequest disableChunkTransfer(HttpClientRequest request) {
return request.chunkedTransfer(false);
}

private static HttpClientRequest disableFailOnError(HttpClientRequest request) {
return request
.failOnClientError(false)
.failOnServerError(false);
}

protected Mono<String> createBasicAuthenticationToken(String username, String password) {
return Mono.fromSupplier(() -> {
String credentials = username + ":" + password;
Expand All @@ -117,62 +133,203 @@ public Flux<NodeInfo> getNodes() {
}

public Mono<NodeInfo> getNode(String name) {
return doGetMono(NodeInfo.class, "nodes", name);
return doGetMono(NodeInfo.class, "nodes", enc(name));
}

public Flux<ConnectionInfo> getConnections() {
return doGetFlux(ConnectionInfo.class, "connections");
}

public Mono<ConnectionInfo> getConnection(String name) {
return doGetMono(ConnectionInfo.class, "connections", name);
return doGetMono(ConnectionInfo.class, "connections", enc(name));
}

public Mono<HttpResponse> closeConnection(String name) {
return doDelete("connections", name);
return doDelete("connections", enc(name));
}

public Mono<HttpResponse> closeConnection(String name, String reason) {
return doDelete(request -> request.header("X-Reason", reason), "connections", name);
return doDelete(request -> request.header("X-Reason", reason), "connections", enc(name));
}

public Mono<HttpResponse> declarePolicy(String vhost, String name, PolicyInfo info) {
return doPost(info, "policies", vhost, name);
}

private HttpClientRequest disableChunkTransfer(HttpClientRequest request) {
return request.chunkedTransfer(false);
return doPut(info, "policies", enc(vhost), enc(name));
}

public Flux<PolicyInfo> getPolicies() {
return doGetFlux(PolicyInfo.class, "policies");
}

public Mono<HttpResponse> deletePolicy(String vhost, String name) {
return doDelete("policies", vhost, name);
return doDelete("policies", enc(vhost), enc(name));
}

public Flux<ChannelInfo> getChannels() {
return doGetFlux(ChannelInfo.class, "channels");
}

public Flux<ChannelInfo> getChannels(String connectionName) {
return doGetFlux(ChannelInfo.class, "connections", enc(connectionName), "channels");
}

public Mono<ChannelInfo> getChannel(String name) {
return doGetMono(ChannelInfo.class, "channels", enc(name));
}

public Flux<VhostInfo> getVhosts() {
return doGetFlux(VhostInfo.class, "vhosts");
}

public Mono<VhostInfo> getVhost(String name) {
return doGetMono(VhostInfo.class, "vhosts", enc(name));
}

public Mono<HttpResponse> createVhost(String name) {
return doPut("vhosts", enc(name));
}

public Mono<HttpResponse> deleteVhost(String name) {
return doDelete("vhosts", enc(name));
}

public Flux<UserPermissions> getPermissionsIn(String vhost) {
return doGetFlux(UserPermissions.class, "vhosts", enc(vhost), "permissions");
}

public Mono<HttpResponse> updatePermissions(String vhost, String username, UserPermissions permissions) {
return doPut(permissions, "permissions", enc(vhost), enc(username));
}

public Flux<UserInfo> getUsers() {
return doGetFlux(UserInfo.class, "users");
}

public Mono<UserInfo> getUser(String username) {
return doGetMono(UserInfo.class, "users", enc(username));
}

public Mono<HttpResponse> deleteUser(String username) {
return doDelete("users", enc(username));
}

public Mono<HttpResponse> createUser(String username, char[] password, List<String> tags) {
if (username == null) {
throw new IllegalArgumentException("username cannot be null");
}
if (password == null) {
throw new IllegalArgumentException("password cannot be null or empty. If you need to create a user that "
+ "will only authenticate using an x509 certificate, use createUserWithPasswordHash with a blank hash.");
}
Map<String, Object> body = new HashMap<String, Object>();
body.put("password", new String(password));
if (tags == null || tags.isEmpty()) {
body.put("tags", "");
} else {
body.put("tags", String.join(",", tags));
}
return doPut(body, "users", enc(username));
}

public Mono<HttpResponse> updateUser(String username, char[] password, List<String> tags) {
if (username == null) {
throw new IllegalArgumentException("username cannot be null");
}
Map<String, Object> body = new HashMap<String, Object>();
// only update password if provided
if (password != null) {
body.put("password", new String(password));
}
if (tags == null || tags.isEmpty()) {
body.put("tags", "");
} else {
body.put("tags", String.join(",", tags));
}

return doPut(body, "users", enc(username));
}

public Flux<UserPermissions> getPermissionsOf(String username) {
return doGetFlux(UserPermissions.class, "users", enc(username), "permissions");
}

public Mono<HttpResponse> createUserWithPasswordHash(String username, char[] passwordHash, List<String> tags) {
if (username == null) {
throw new IllegalArgumentException("username cannot be null");
}
// passwordless authentication is a thing. See
// https://github.com/rabbitmq/hop/issues/94 and https://www.rabbitmq.com/authentication.html. MK.
if (passwordHash == null) {
passwordHash = "".toCharArray();
}
Map<String, Object> body = new HashMap<String, Object>();
body.put("password_hash", String.valueOf(passwordHash));
if (tags == null || tags.isEmpty()) {
body.put("tags", "");
} else {
body.put("tags", String.join(",", tags));
}

return doPut(body, "users", enc(username));
}

public Mono<CurrentUserDetails> whoAmI() {
return doGetMono(CurrentUserDetails.class, "whoami");
}

public Flux<UserPermissions> getPermissions() {
return doGetFlux(UserPermissions.class, "permissions");
}

public Mono<UserPermissions> getPermissions(String vhost, String username) {
return doGetMono(UserPermissions.class, "permissions", enc(vhost), enc(username));
}

public Mono<HttpResponse> clearPermissions(String vhost, String username) {
return doDelete("permissions", enc(vhost), enc(username));
}

public Flux<ExchangeInfo> getExchanges() {
return doGetFlux(ExchangeInfo.class, "exchanges");
}

public Flux<ExchangeInfo> getExchanges(String vhost) {
return doGetFlux(ExchangeInfo.class, "exchanges", enc(vhost));
}

private <T> Mono<T> doGetMono(Class<T> type, String... pathSegments) {
return client.get(uri(pathSegments), request -> Mono.just(request)
.transform(this::addAuthorization)
.flatMap(pRequest -> pRequest.send())).transform(decode(type));
.flatMap(pRequest -> pRequest.send()))
.onErrorMap(this::handleError)
.transform(decode(type));
}

private <T> Flux<T> doGetFlux(Class<T> type, String... pathSegments) {
return (Flux<T>) doGetMono(Array.newInstance(type, 0).getClass(), pathSegments).flatMapMany(items -> Flux.fromArray((Object[]) items));
}

private Mono<HttpResponse> doPost(Object body, String... pathSegments) {
private Mono<HttpResponse> doPut(Object body, String... pathSegments) {
return client.put(uri(pathSegments), request -> Mono.just(request)
.transform(this::addAuthorization)
.map(this::disableChunkTransfer)
.map(ReactorNettyClient::disableChunkTransfer)
.map(ReactorNettyClient::disableFailOnError)
.transform(encode(body)))
.map(ReactorNettyClient::toHttpResponse);
}

private Mono<HttpResponse> doPut(String... pathSegments) {
return client.put(uri(pathSegments), request -> Mono.just(request)
.transform(this::addAuthorization)
.map(ReactorNettyClient::disableChunkTransfer)
.map(ReactorNettyClient::disableFailOnError)
.flatMap(request2 -> request2.send()))
.map(ReactorNettyClient::toHttpResponse);
}

private Mono<HttpResponse> doDelete(UnaryOperator<HttpClientRequest> operator, String... pathSegments) {
return client.delete(uri(pathSegments), request -> Mono.just(request)
.transform(this::addAuthorization)
.map(ReactorNettyClient::disableFailOnError)
.map(operator)
.flatMap(HttpClientRequest::send)
).map(ReactorNettyClient::toHttpResponse);
Expand All @@ -189,14 +346,11 @@ private Mono<HttpClientRequest> addAuthorization(Mono<HttpClientRequest> request
}

private String uri(String... pathSegments) {
StringBuilder builder = new StringBuilder();
if (pathSegments != null && pathSegments.length > 0) {
for (String pathSegment : pathSegments) {
builder.append("/");
builder.append(Utils.encode(pathSegment));
}
}
return rootUrl + builder.toString();
return rootUrl + "/" + String.join("/", pathSegments);
}

private String enc(String pathSegment) {
return Utils.encode(pathSegment);
}

private <T> Function<Mono<HttpClientResponse>, Flux<T>> decode(Class<T> type) {
Expand Down Expand Up @@ -228,4 +382,12 @@ private Function<Mono<HttpClientRequest>, Publisher<Void>> encode(Object request
}
});
}

// FIXME make this configurable
private <T extends Throwable> T handleError(T cause) {
if (cause instanceof reactor.ipc.netty.http.client.HttpClientException) {
return (T) new HttpClientException((reactor.ipc.netty.http.client.HttpClientException) cause);
}
return (T) new HttpException(cause);
}
}
Loading

0 comments on commit 5b880c8

Please sign in to comment.