Skip to content

Commit

Permalink
Clean code of ReactorNettyClient for re-use
Browse files Browse the repository at this point in the history
[#157001487]

References #122
  • Loading branch information
acogoluegnes committed Apr 24, 2018
1 parent fe4dc9a commit f60bbbe
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 43 deletions.
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ ext {
groovyVersion = '2.4.13'
jacksonVersion = '2.9.4'
spockVersion = "1.1-groovy-2.4"
reactorVersion = "Bismuth-SR3"
reactorVersion = "Bismuth-SR8"
springVersion = '5.0.3.RELEASE'
reactorNettyVersion = '0.7.6.RELEASE'

linkHomepage = 'https://github.com/rabbitmq/hop'
linkCi = 'https://build.spring.io/browse/RMQ'
Expand All @@ -63,7 +64,7 @@ dependencies {
compile "org.springframework:spring-webflux:$springVersion"
compile "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
compile "io.projectreactor.ipc:reactor-netty"
compile "io.projectreactor.ipc:reactor-netty:$reactorNettyVersion"

testCompile "org.codehaus.groovy:groovy-all:$groovyVersion"
testCompile "org.spockframework:spock-core:$spockVersion"
Expand Down
104 changes: 63 additions & 41 deletions src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import reactor.ipc.netty.http.client.HttpClientRequest;
import reactor.ipc.netty.http.client.HttpClientResponse;

import java.lang.reflect.Array;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.function.Function;
Expand All @@ -47,7 +48,7 @@ public class ReactorNettyClient {

private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024;

private final UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl("http://localhost:15672/api");
private final Mono<String> root = Mono.just("http://localhost:15672/api");

private final ObjectMapper objectMapper = new ObjectMapper();

Expand All @@ -61,6 +62,7 @@ public ReactorNettyClient() {
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION);

UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl(root.block());
client = HttpClient.create(options -> options.host(uriBuilder.build().getHost()).port(uriBuilder.build().getPort()));

String credentials = "guest" + ":" + "guest";
Expand All @@ -70,59 +72,63 @@ public ReactorNettyClient() {
authorizationHeader = "Basic " + encodedCredentials;
}

static Function<Mono<HttpClientRequest>, Publisher<Void>> encode(ObjectMapper objectMapper, Object requestPayload) {
return outbound -> outbound
.flatMapMany(request -> {
try {
byte[] bytes = objectMapper.writeValueAsBytes(requestPayload);

return request
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(bytes.length))
.sendByteArray(Mono.just(bytes));
} catch (JsonProcessingException e) {
throw Exceptions.propagate(e);
}
});
}

public Mono<OverviewResponse> getOverview() {
return client.get(uriBuilder.cloneBuilder().pathSegment("overview").toUriString(), request -> request
.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader)
.send()).transform(decode(OverviewResponse.class));
return doGetMono(builder -> builder.pathSegment("overview"), OverviewResponse.class);
}

public Flux<NodeInfo> getNodes() {
return client.get(uriBuilder.cloneBuilder().pathSegment("nodes").toUriString(), request -> request
.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader)
.send()).transform(decode(NodeInfo[].class)).flatMapMany(nodes -> Flux.fromArray(nodes));
return doGetFlux(builder -> builder.pathSegment("nodes"), NodeInfo.class);
}

public Mono<HttpClientResponse> declarePolicy(String vhost, String name, PolicyInfo info) {
return client.put(uriBuilder.cloneBuilder()
.pathSegment("policies", "{vhost}", "{name}")
.build(vhost, name).toASCIIString(), request -> {
request.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader)
.chunkedTransfer(false)
.failOnClientError(false)
.failOnServerError(false);

return Mono.just(request).transform(encode(objectMapper, info));
});
return doPost(builder -> builder.pathSegment("policies", vhost, name), info);
}

private HttpClientRequest disableChunkTransfer(HttpClientRequest request) {
return request.chunkedTransfer(false);
}

public Flux<PolicyInfo> getPolicies() {
return client.get(uriBuilder.cloneBuilder().pathSegment("policies").toUriString(), request -> request
.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader)
.send()).transform(decode(PolicyInfo[].class)).flatMapMany(nodes -> Flux.fromArray(nodes));
return doGetFlux(builder -> builder.pathSegment("policies"), PolicyInfo.class);
}

public Mono<HttpClientResponse> deletePolicy(String vhost, String name) {
return client.delete(uriBuilder.cloneBuilder()
.pathSegment("policies", "{vhost}", "{name}")
.build(vhost, name).toASCIIString(), request -> request
.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader)
.send());
return doDelete(builder -> builder.pathSegment("policies", vhost, name));
}

private <T> Mono<T> doGetMono(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Class<T> type) {
return client.get(uri(uriTransformer), request -> Mono.just(request)
.map(this::addAuthentication)
.flatMap(pRequest -> pRequest.send())).transform(decode(type));
}

private <T> Flux<T> doGetFlux(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Class<T> type) {
return (Flux<T>) doGetMono(uriTransformer, Array.newInstance(type, 0).getClass()).flatMapMany(items -> Flux.fromArray((Object[]) items));
}

private Mono<HttpClientResponse> doPost(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Object body) {
return client.put(uri(uriTransformer), request -> Mono.just(request)
.map(this::addAuthentication)
.map(this::disableChunkTransfer)
.transform(encode(body)));
}

private Mono<HttpClientResponse> doDelete(Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer) {
return client.delete(uri(uriTransformer), request -> Mono.just(request)
.map(this::addAuthentication)
.flatMap(HttpClientRequest::send)
);
}

private HttpClientRequest addAuthentication(HttpClientRequest request) {
return request.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader);
}

private String uri(Function<UriComponentsBuilder, UriComponentsBuilder> transformer) {
return root.map(UriComponentsBuilder::fromHttpUrl)
.map(transformer)
.map(builder -> builder.build().encode())
.block().toUriString();
}

private <T> Function<Mono<HttpClientResponse>, Flux<T>> decode(Class<T> type) {
Expand All @@ -138,4 +144,20 @@ private <T> Function<Mono<HttpClientResponse>, Flux<T>> decode(Class<T> type) {
})
);
}

private Function<Mono<HttpClientRequest>, Publisher<Void>> encode(Object requestPayload) {
return outbound -> outbound
.flatMapMany(request -> {
try {
byte[] bytes = objectMapper.writeValueAsBytes(requestPayload);

return request
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(bytes.length))
.sendByteArray(Mono.just(bytes));
} catch (JsonProcessingException e) {
throw Exceptions.propagate(e);
}
});
}
}

0 comments on commit f60bbbe

Please sign in to comment.