diff --git a/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java index c3820ce9..3a1e554e 100644 --- a/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java +++ b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java @@ -70,33 +70,39 @@ public class ReactorNettyClient { private final String rootUrl; - private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper; private final HttpClient client; private final Mono token; - public ReactorNettyClient(String url) { + private final Function errorHandler; + + public ReactorNettyClient(String url, ReactorNettyClientOptions options) { this(urlWithoutCredentials(url), URI.create(url).getUserInfo().split(":")[0], - URI.create(url).getUserInfo().split(":")[1]); + URI.create(url).getUserInfo().split(":")[1], options); + } + + public ReactorNettyClient(String url) { + this(url, new ReactorNettyClientOptions()); } public ReactorNettyClient(String url, String username, String password) { + this(url, username, password, new ReactorNettyClientOptions()); + } + + public ReactorNettyClient(String url, String username, String password, ReactorNettyClientOptions options) { rootUrl = url; - // FIXME make Jackson ObjectMapper configurable - objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT); - objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION); + objectMapper = options.objectMapper() == null ? createDefaultObjectMapper() : options.objectMapper().get(); URI uri = URI.create(url); - client = HttpClient.create(options -> options.host(uri.getHost()).port(uri.getPort())); + client = options.client() == null ? + HttpClient.create(httpOptions -> httpOptions.host(uri.getHost()).port(uri.getPort())) : options.client().get(); - // FIXME make Authentication header value configurable (default being Basic) - this.token = createBasicAuthenticationToken(username, password); + this.token = options.token() == null ? createBasicAuthenticationToken(username, password) : options.token(); - // FIXME make SSLContext configurable when using TLS + this.errorHandler = options.errorHandler() == null ? ReactorNettyClient::handleError : options.errorHandler(); } private static String urlWithoutCredentials(String url) { @@ -122,14 +128,33 @@ private static HttpClientRequest disableFailOnError(HttpClientRequest request) { .failOnServerError(false); } - protected Mono createBasicAuthenticationToken(String username, String password) { - return Mono.fromSupplier(() -> { - String credentials = username + ":" + password; - byte[] credentialsAsBytes = credentials.getBytes(StandardCharsets.ISO_8859_1); - byte[] encodedBytes = Base64.getEncoder().encode(credentialsAsBytes); - String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1); - return "Basic " + encodedCredentials; - }).cache(); + public static ObjectMapper createDefaultObjectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT); + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION); + return objectMapper; + } + + public static Mono createBasicAuthenticationToken(String username, String password) { + return Mono.fromSupplier(() -> basicAuthentication(username, password)).cache(); + } + + public static String basicAuthentication(String username, String password) { + String credentials = username + ":" + password; + byte[] credentialsAsBytes = credentials.getBytes(StandardCharsets.ISO_8859_1); + byte[] encodedBytes = Base64.getEncoder().encode(credentialsAsBytes); + String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1); + return "Basic " + encodedCredentials; + } + + @SuppressWarnings("unchecked") + public static T handleError(T cause) { + if (cause instanceof reactor.ipc.netty.http.client.HttpClientException) { + return (T) new HttpClientException((reactor.ipc.netty.http.client.HttpClientException) cause); + } + return (T) new HttpException(cause); } public Mono getOverview() { @@ -325,13 +350,13 @@ public Mono getClusterName() { } public Mono setClusterName(String name) { - if(name== null || name.isEmpty()) { + if (name == null || name.isEmpty()) { throw new IllegalArgumentException("name cannot be null or blank"); } - return doPut(Collections.singletonMap("name", name),"cluster-name"); + return doPut(Collections.singletonMap("name", name), "cluster-name"); } - @SuppressWarnings({"unchecked","rawtypes"}) + @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux getExtensions() { return doGetFlux(Map.class, "extensions"); } @@ -353,7 +378,7 @@ public Mono getQueue(String vhost, String name) { } public Mono declareQueue(String vhost, String name, QueueInfo info) { - return doPut(info,"queues", enc(vhost), enc(name)); + return doPut(info, "queues", enc(vhost), enc(name)); } public Mono purgeQueue(String vhost, String name) { @@ -422,17 +447,17 @@ public Mono bindQueue(String vhost, String queue, String exchange, } public Mono bindQueue(String vhost, String queue, String exchange, String routingKey, Map args) { - if(vhost == null || vhost.isEmpty()) { + if (vhost == null || vhost.isEmpty()) { throw new IllegalArgumentException("vhost cannot be null or blank"); } - if(queue == null || queue.isEmpty()) { + if (queue == null || queue.isEmpty()) { throw new IllegalArgumentException("queue cannot be null or blank"); } - if(exchange == null || exchange.isEmpty()) { + if (exchange == null || exchange.isEmpty()) { throw new IllegalArgumentException("exchange cannot be null or blank"); } Map body = new HashMap(); - if(!(args == null)) { + if (!(args == null)) { body.put("args", args); } body.put("routing_key", routingKey); @@ -441,7 +466,7 @@ public Mono bindQueue(String vhost, String queue, String exchange, } public Mono declareShovel(String vhost, ShovelInfo info) { - return doPut(info,"parameters", "shovel", enc(vhost), enc(info.getName())); + return doPut(info, "parameters", "shovel", enc(vhost), enc(info.getName())); } public Flux getShovels() { @@ -460,7 +485,7 @@ private Mono doGetMono(Class type, String... pathSegments) { return client.get(uri(pathSegments), request -> Mono.just(request) .transform(this::addAuthorization) .flatMap(HttpClientRequest::send)) - .onErrorMap(this::handleError) + .onErrorMap(this.errorHandler) .transform(decode(type)); } @@ -552,13 +577,4 @@ private Function, Publisher> encode(Object request } }); } - - // FIXME make this configurable - @SuppressWarnings("unchecked") - private T handleError(T cause) { - if (cause instanceof reactor.ipc.netty.http.client.HttpClientException) { - return (T) new HttpClientException((reactor.ipc.netty.http.client.HttpClientException) cause); - } - return (T) new HttpException(cause); - } } diff --git a/src/main/java/com/rabbitmq/http/client/ReactorNettyClientOptions.java b/src/main/java/com/rabbitmq/http/client/ReactorNettyClientOptions.java new file mode 100644 index 00000000..a7775a88 --- /dev/null +++ b/src/main/java/com/rabbitmq/http/client/ReactorNettyClientOptions.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rabbitmq.http.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import reactor.core.publisher.Mono; +import reactor.ipc.netty.http.client.HttpClient; + +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * + */ +public class ReactorNettyClientOptions { + + private Supplier client; + + private Supplier objectMapper; + + private Mono token; + + private Function errorHandler; + + public Supplier objectMapper() { + return objectMapper; + } + + public ReactorNettyClientOptions objectMapper(Supplier objectMapper) { + this.objectMapper = objectMapper; + return this; + } + + public Mono token() { + return token; + } + + public ReactorNettyClientOptions token(Mono token) { + this.token = token; + return this; + } + + public Function errorHandler() { + return errorHandler; + } + + public ReactorNettyClientOptions errorHandler(Function errorHandler) { + this.errorHandler = errorHandler; + return this; + } + + public Supplier client() { + return client; + } + + public ReactorNettyClientOptions client(Supplier client) { + this.client = client; + return this; + } +}