Skip to content

Commit

Permalink
Add options to make Reactor Netty client configurable
Browse files Browse the repository at this point in the history
[#157001487]

References #122
  • Loading branch information
acogoluegnes committed May 15, 2018
1 parent 265925f commit a1031ef
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 39 deletions.
94 changes: 55 additions & 39 deletions src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,39 @@ public class ReactorNettyClient {

private final String rootUrl;

private final ObjectMapper objectMapper = new ObjectMapper();
private final ObjectMapper objectMapper;

private final HttpClient client;

private final Mono<String> token;

public ReactorNettyClient(String url) {
private final Function<? super Throwable, ? extends Throwable> errorHandler;

public ReactorNettyClient(String url, ReactorNettyClientOptions options) {
this(urlWithoutCredentials(url),
URI.create(url).getUserInfo().split(":")[0],
URI.create(url).getUserInfo().split(":")[1]);
URI.create(url).getUserInfo().split(":")[1], options);
}

public ReactorNettyClient(String url) {
this(url, new ReactorNettyClientOptions());
}

public ReactorNettyClient(String url, String username, String password) {
this(url, username, password, new ReactorNettyClientOptions());
}

public ReactorNettyClient(String url, String username, String password, ReactorNettyClientOptions options) {
rootUrl = url;
// FIXME make Jackson ObjectMapper configurable
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION);
objectMapper = options.objectMapper() == null ? createDefaultObjectMapper() : options.objectMapper().get();

URI uri = URI.create(url);
client = HttpClient.create(options -> options.host(uri.getHost()).port(uri.getPort()));
client = options.client() == null ?
HttpClient.create(httpOptions -> httpOptions.host(uri.getHost()).port(uri.getPort())) : options.client().get();

// FIXME make Authentication header value configurable (default being Basic)
this.token = createBasicAuthenticationToken(username, password);
this.token = options.token() == null ? createBasicAuthenticationToken(username, password) : options.token();

// FIXME make SSLContext configurable when using TLS
this.errorHandler = options.errorHandler() == null ? ReactorNettyClient::handleError : options.errorHandler();
}

private static String urlWithoutCredentials(String url) {
Expand All @@ -122,14 +128,33 @@ private static HttpClientRequest disableFailOnError(HttpClientRequest request) {
.failOnServerError(false);
}

protected Mono<String> createBasicAuthenticationToken(String username, String password) {
return Mono.fromSupplier(() -> {
String credentials = username + ":" + password;
byte[] credentialsAsBytes = credentials.getBytes(StandardCharsets.ISO_8859_1);
byte[] encodedBytes = Base64.getEncoder().encode(credentialsAsBytes);
String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1);
return "Basic " + encodedCredentials;
}).cache();
public static ObjectMapper createDefaultObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION);
return objectMapper;
}

public static Mono<String> createBasicAuthenticationToken(String username, String password) {
return Mono.fromSupplier(() -> basicAuthentication(username, password)).cache();
}

public static String basicAuthentication(String username, String password) {
String credentials = username + ":" + password;
byte[] credentialsAsBytes = credentials.getBytes(StandardCharsets.ISO_8859_1);
byte[] encodedBytes = Base64.getEncoder().encode(credentialsAsBytes);
String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1);
return "Basic " + encodedCredentials;
}

@SuppressWarnings("unchecked")
public static <T extends Throwable> T handleError(T cause) {
if (cause instanceof reactor.ipc.netty.http.client.HttpClientException) {
return (T) new HttpClientException((reactor.ipc.netty.http.client.HttpClientException) cause);
}
return (T) new HttpException(cause);
}

public Mono<OverviewResponse> getOverview() {
Expand Down Expand Up @@ -325,13 +350,13 @@ public Mono<ClusterId> getClusterName() {
}

public Mono<HttpResponse> setClusterName(String name) {
if(name== null || name.isEmpty()) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("name cannot be null or blank");
}
return doPut(Collections.singletonMap("name", name),"cluster-name");
return doPut(Collections.singletonMap("name", name), "cluster-name");
}

@SuppressWarnings({"unchecked","rawtypes"})
@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<Map> getExtensions() {
return doGetFlux(Map.class, "extensions");
}
Expand All @@ -353,7 +378,7 @@ public Mono<QueueInfo> getQueue(String vhost, String name) {
}

public Mono<HttpResponse> declareQueue(String vhost, String name, QueueInfo info) {
return doPut(info,"queues", enc(vhost), enc(name));
return doPut(info, "queues", enc(vhost), enc(name));
}

public Mono<HttpResponse> purgeQueue(String vhost, String name) {
Expand Down Expand Up @@ -422,17 +447,17 @@ public Mono<HttpResponse> bindQueue(String vhost, String queue, String exchange,
}

public Mono<HttpResponse> bindQueue(String vhost, String queue, String exchange, String routingKey, Map<String, Object> args) {
if(vhost == null || vhost.isEmpty()) {
if (vhost == null || vhost.isEmpty()) {
throw new IllegalArgumentException("vhost cannot be null or blank");
}
if(queue == null || queue.isEmpty()) {
if (queue == null || queue.isEmpty()) {
throw new IllegalArgumentException("queue cannot be null or blank");
}
if(exchange == null || exchange.isEmpty()) {
if (exchange == null || exchange.isEmpty()) {
throw new IllegalArgumentException("exchange cannot be null or blank");
}
Map<String, Object> body = new HashMap<String, Object>();
if(!(args == null)) {
if (!(args == null)) {
body.put("args", args);
}
body.put("routing_key", routingKey);
Expand All @@ -441,7 +466,7 @@ public Mono<HttpResponse> bindQueue(String vhost, String queue, String exchange,
}

public Mono<HttpResponse> declareShovel(String vhost, ShovelInfo info) {
return doPut(info,"parameters", "shovel", enc(vhost), enc(info.getName()));
return doPut(info, "parameters", "shovel", enc(vhost), enc(info.getName()));
}

public Flux<ShovelInfo> getShovels() {
Expand All @@ -460,7 +485,7 @@ private <T> Mono<T> doGetMono(Class<T> type, String... pathSegments) {
return client.get(uri(pathSegments), request -> Mono.just(request)
.transform(this::addAuthorization)
.flatMap(HttpClientRequest::send))
.onErrorMap(this::handleError)
.onErrorMap(this.errorHandler)
.transform(decode(type));
}

Expand Down Expand Up @@ -552,13 +577,4 @@ private Function<Mono<HttpClientRequest>, Publisher<Void>> encode(Object request
}
});
}

// FIXME make this configurable
@SuppressWarnings("unchecked")
private <T extends Throwable> T handleError(T cause) {
if (cause instanceof reactor.ipc.netty.http.client.HttpClientException) {
return (T) new HttpClientException((reactor.ipc.netty.http.client.HttpClientException) cause);
}
return (T) new HttpException(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.rabbitmq.http.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.client.HttpClient;

import java.util.function.Function;
import java.util.function.Supplier;

/**
*
*/
public class ReactorNettyClientOptions {

private Supplier<HttpClient> client;

private Supplier<ObjectMapper> objectMapper;

private Mono<String> token;

private Function<? super Throwable, ? extends Throwable> errorHandler;

public Supplier<ObjectMapper> objectMapper() {
return objectMapper;
}

public ReactorNettyClientOptions objectMapper(Supplier<ObjectMapper> objectMapper) {
this.objectMapper = objectMapper;
return this;
}

public Mono<String> token() {
return token;
}

public ReactorNettyClientOptions token(Mono<String> token) {
this.token = token;
return this;
}

public Function<? super Throwable, ? extends Throwable> errorHandler() {
return errorHandler;
}

public ReactorNettyClientOptions errorHandler(Function<? super Throwable, ? extends Throwable> errorHandler) {
this.errorHandler = errorHandler;
return this;
}

public Supplier<HttpClient> client() {
return client;
}

public ReactorNettyClientOptions client(Supplier<HttpClient> client) {
this.client = client;
return this;
}
}

0 comments on commit a1031ef

Please sign in to comment.