Skip to content

Commit

Permalink
Don't rely on UriComponentsBuilder in ReactorNettyClient
Browse files Browse the repository at this point in the history
[#157001487]

References #122
  • Loading branch information
acogoluegnes committed Apr 27, 2018
1 parent 904f3a3 commit 572c756
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 27 deletions.
76 changes: 50 additions & 26 deletions src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.json.JsonObjectDecoder;
import org.reactivestreams.Publisher;
import org.springframework.web.util.UriComponentsBuilder;
import org.springframework.util.StringUtils;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientRequest;
import reactor.ipc.netty.http.client.HttpClientResponse;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Array;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.function.Function;
Expand All @@ -48,27 +51,36 @@ public class ReactorNettyClient {

private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024;

private final Mono<String> root = Mono.just("http://localhost:15672/api");
private static final String ENCODING_CHARSET = "UTF-8";

private final String rootUrl;

private final ObjectMapper objectMapper = new ObjectMapper();

private final HttpClient client;

private final String authorizationHeader;

public ReactorNettyClient() {
public ReactorNettyClient(String url) {
this(urlWithoutCredentials(url),
StringUtils.split(URI.create(url).getUserInfo(), ":")[0],
StringUtils.split(URI.create(url).getUserInfo(), ":")[1]);
}

public ReactorNettyClient(String url, String username, String password) {
rootUrl = url;
// FIXME make Jackson ObjectMapper configurable
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION);

// FIXME make URL configurable
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(root.block());
client = HttpClient.create(options -> options.host(uriBuilder.build().getHost()).port(uriBuilder.build().getPort()));
URI uri = URI.create(url);
client = HttpClient.create(options -> options.host(uri.getHost()).port(uri.getPort()));

// FIXME make Authentication header value configurable (default being Basic)
String credentials = "guest" + ":" + "guest";
String credentials = username + ":" + password;
byte[] credentialsAsBytes = credentials.getBytes(StandardCharsets.ISO_8859_1);
byte[] encodedBytes = Base64.getEncoder().encode(credentialsAsBytes);
String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1);
Expand All @@ -77,49 +89,54 @@ public ReactorNettyClient() {
// FIXME make SSLContext configurable when using TLS
}

private static String urlWithoutCredentials(String url) {
URI url1 = URI.create(url);
return StringUtils.replace(url, url1.getUserInfo() + "@", "");
}

public Mono<OverviewResponse> getOverview() {
return doGetMono(builder -> builder.pathSegment("overview"), OverviewResponse.class);
return doGetMono(OverviewResponse.class, "overview");
}

public Flux<NodeInfo> getNodes() {
return doGetFlux(builder -> builder.pathSegment("nodes"), NodeInfo.class);
return doGetFlux(NodeInfo.class, "nodes");
}

public Mono<HttpClientResponse> declarePolicy(String vhost, String name, PolicyInfo info) {
return doPost(builder -> builder.pathSegment("policies", vhost, name), info);
return doPost(info, "policies", vhost, name);
}

private HttpClientRequest disableChunkTransfer(HttpClientRequest request) {
return request.chunkedTransfer(false);
}

public Flux<PolicyInfo> getPolicies() {
return doGetFlux(builder -> builder.pathSegment("policies"), PolicyInfo.class);
return doGetFlux(PolicyInfo.class, "policies");
}

public Mono<HttpClientResponse> deletePolicy(String vhost, String name) {
return doDelete(builder -> builder.pathSegment("policies", vhost, name));
return doDelete("policies", vhost, name);
}

private <T> Mono<T> doGetMono(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Class<T> type) {
return client.get(uri(uriTransformer), request -> Mono.just(request)
private <T> Mono<T> doGetMono(Class<T> type, String... pathSegments) {
return client.get(uri(pathSegments), request -> Mono.just(request)
.map(this::addAuthentication)
.flatMap(pRequest -> pRequest.send())).transform(decode(type));
}

private <T> Flux<T> doGetFlux(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Class<T> type) {
return (Flux<T>) doGetMono(uriTransformer, Array.newInstance(type, 0).getClass()).flatMapMany(items -> Flux.fromArray((Object[]) items));
private <T> Flux<T> doGetFlux(Class<T> type, String... pathSegments) {
return (Flux<T>) doGetMono(Array.newInstance(type, 0).getClass(), pathSegments).flatMapMany(items -> Flux.fromArray((Object[]) items));
}

private Mono<HttpClientResponse> doPost(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Object body) {
return client.put(uri(uriTransformer), request -> Mono.just(request)
private Mono<HttpClientResponse> doPost(Object body, String... pathSegments) {
return client.put(uri(pathSegments), request -> Mono.just(request)
.map(this::addAuthentication)
.map(this::disableChunkTransfer)
.transform(encode(body)));
}

private Mono<HttpClientResponse> doDelete(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer) {
return client.delete(uri(uriTransformer), request -> Mono.just(request)
private Mono<HttpClientResponse> doDelete(String... pathSegments) {
return client.delete(uri(pathSegments), request -> Mono.just(request)
.map(this::addAuthentication)
.flatMap(HttpClientRequest::send)
);
Expand All @@ -129,13 +146,20 @@ private HttpClientRequest addAuthentication(HttpClientRequest request) {
return request.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader);
}

private String uri(Function<UriComponentsBuilder, UriComponentsBuilder> transformer) {
// FIXME encode URL without Spring dependencies
// the encoding should be simple enough to not depend on Spring just for this
return root.map(UriComponentsBuilder::fromHttpUrl)
.map(transformer)
.map(builder -> builder.build().encode())
.block().toUriString();
private String uri(String... pathSegments) {
StringBuilder builder = new StringBuilder();
if (pathSegments != null && pathSegments.length > 0) {
for (String pathSegment : pathSegments) {
try {
builder.append("/");
builder.append(URLEncoder.encode(pathSegment, ENCODING_CHARSET));
} catch (UnsupportedEncodingException e) {
// FIXME exception handling
throw new RuntimeException(e);
}
}
}
return rootUrl + builder.toString();
}

private <T> Function<Mono<HttpClientResponse>, Flux<T>> decode(Class<T> type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ReactorNettyClientSpec extends Specification {
}

protected static ReactorNettyClient newLocalhostNodeClient() {
new ReactorNettyClient()
new ReactorNettyClient("http://guest:guest@localhost:15672/api")
}

def "GET /api/overview"() {
Expand Down

0 comments on commit 572c756

Please sign in to comment.