diff --git a/build.gradle b/build.gradle index c2f72233..c1891c9d 100644 --- a/build.gradle +++ b/build.gradle @@ -37,8 +37,9 @@ ext { groovyVersion = '2.4.13' jacksonVersion = '2.9.4' spockVersion = "1.1-groovy-2.4" - reactorVersion = "Bismuth-SR3" + reactorVersion = "Bismuth-SR8" springVersion = '5.0.3.RELEASE' + reactorNettyVersion = '0.7.6.RELEASE' linkHomepage = 'https://github.com/rabbitmq/hop' linkCi = 'https://build.spring.io/browse/RMQ' @@ -63,7 +64,7 @@ dependencies { compile "org.springframework:spring-webflux:$springVersion" compile "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion" compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion" - compile "io.projectreactor.ipc:reactor-netty" + compile "io.projectreactor.ipc:reactor-netty:$reactorNettyVersion" testCompile "org.codehaus.groovy:groovy-all:$groovyVersion" testCompile "org.spockframework:spock-core:$spockVersion" diff --git a/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java index 41425de7..ea7a5178 100644 --- a/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java +++ b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java @@ -36,6 +36,7 @@ import reactor.ipc.netty.http.client.HttpClientRequest; import reactor.ipc.netty.http.client.HttpClientResponse; +import java.lang.reflect.Array; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.function.Function; @@ -47,7 +48,7 @@ public class ReactorNettyClient { private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024; - private final UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl("http://localhost:15672/api"); + private final Mono root = Mono.just("http://localhost:15672/api"); private final ObjectMapper objectMapper = new ObjectMapper(); @@ -61,6 +62,7 @@ public ReactorNettyClient() { objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION); + UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(root.block()); client = HttpClient.create(options -> options.host(uriBuilder.build().getHost()).port(uriBuilder.build().getPort())); String credentials = "guest" + ":" + "guest"; @@ -70,59 +72,63 @@ public ReactorNettyClient() { authorizationHeader = "Basic " + encodedCredentials; } - static Function, Publisher> encode(ObjectMapper objectMapper, Object requestPayload) { - return outbound -> outbound - .flatMapMany(request -> { - try { - byte[] bytes = objectMapper.writeValueAsBytes(requestPayload); - - return request - .header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) - .header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(bytes.length)) - .sendByteArray(Mono.just(bytes)); - } catch (JsonProcessingException e) { - throw Exceptions.propagate(e); - } - }); - } - public Mono getOverview() { - return client.get(uriBuilder.cloneBuilder().pathSegment("overview").toUriString(), request -> request - .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) - .send()).transform(decode(OverviewResponse.class)); + return doGetMono(builder -> builder.pathSegment("overview"), OverviewResponse.class); } public Flux getNodes() { - return client.get(uriBuilder.cloneBuilder().pathSegment("nodes").toUriString(), request -> request - .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) - .send()).transform(decode(NodeInfo[].class)).flatMapMany(nodes -> Flux.fromArray(nodes)); + return doGetFlux(builder -> builder.pathSegment("nodes"), NodeInfo.class); } public Mono declarePolicy(String vhost, String name, PolicyInfo info) { - return client.put(uriBuilder.cloneBuilder() - .pathSegment("policies", "{vhost}", "{name}") - .build(vhost, name).toASCIIString(), request -> { - request.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) - .chunkedTransfer(false) - .failOnClientError(false) - .failOnServerError(false); - - return Mono.just(request).transform(encode(objectMapper, info)); - }); + return doPost(builder -> builder.pathSegment("policies", vhost, name), info); + } + + private HttpClientRequest disableChunkTransfer(HttpClientRequest request) { + return request.chunkedTransfer(false); } public Flux getPolicies() { - return client.get(uriBuilder.cloneBuilder().pathSegment("policies").toUriString(), request -> request - .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) - .send()).transform(decode(PolicyInfo[].class)).flatMapMany(nodes -> Flux.fromArray(nodes)); + return doGetFlux(builder -> builder.pathSegment("policies"), PolicyInfo.class); } public Mono deletePolicy(String vhost, String name) { - return client.delete(uriBuilder.cloneBuilder() - .pathSegment("policies", "{vhost}", "{name}") - .build(vhost, name).toASCIIString(), request -> request - .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) - .send()); + return doDelete(builder -> builder.pathSegment("policies", vhost, name)); + } + + private Mono doGetMono(Function uriTransformer, Class type) { + return client.get(uri(uriTransformer), request -> Mono.just(request) + .map(this::addAuthentication) + .flatMap(pRequest -> pRequest.send())).transform(decode(type)); + } + + private Flux doGetFlux(Function uriTransformer, Class type) { + return (Flux) doGetMono(uriTransformer, Array.newInstance(type, 0).getClass()).flatMapMany(items -> Flux.fromArray((Object[]) items)); + } + + private Mono doPost(Function uriTransformer, Object body) { + return client.put(uri(uriTransformer), request -> Mono.just(request) + .map(this::addAuthentication) + .map(this::disableChunkTransfer) + .transform(encode(body))); + } + + private Mono doDelete(Function uriTransformer) { + return client.delete(uri(uriTransformer), request -> Mono.just(request) + .map(this::addAuthentication) + .flatMap(HttpClientRequest::send) + ); + } + + private HttpClientRequest addAuthentication(HttpClientRequest request) { + return request.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader); + } + + private String uri(Function transformer) { + return root.map(UriComponentsBuilder::fromHttpUrl) + .map(transformer) + .map(builder -> builder.build().encode()) + .block().toUriString(); } private Function, Flux> decode(Class type) { @@ -138,4 +144,20 @@ private Function, Flux> decode(Class type) { }) ); } + + private Function, Publisher> encode(Object requestPayload) { + return outbound -> outbound + .flatMapMany(request -> { + try { + byte[] bytes = objectMapper.writeValueAsBytes(requestPayload); + + return request + .header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) + .header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(bytes.length)) + .sendByteArray(Mono.just(bytes)); + } catch (JsonProcessingException e) { + throw Exceptions.propagate(e); + } + }); + } }