From fe4dc9adebf7247f54cd4ced7a7d7f3a8368fc70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 24 Apr 2018 14:03:45 +0200 Subject: [PATCH] Kick off Reactor Netty based client This commit covers the typical use cases (basic authentication, serialization, deserialization, GET/PUT/DELETE methods). Some refactoring will be needed to re-use and clean up code (e.g. URI creation, authentication). TLS support will also be needed. [#157001487] References #122 --- .../http/client/ReactorNettyClient.java | 141 +++++++++++++++ .../http/client/ReactorNettyClientSpec.groovy | 165 ++++++++++++++++++ 2 files changed, 306 insertions(+) create mode 100644 src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java create mode 100644 src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy diff --git a/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java new file mode 100644 index 00000000..41425de7 --- /dev/null +++ b/src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java @@ -0,0 +1,141 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rabbitmq.http.client; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.http.client.domain.NodeInfo; +import com.rabbitmq.http.client.domain.OverviewResponse; +import com.rabbitmq.http.client.domain.PolicyInfo; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.json.JsonObjectDecoder; +import org.reactivestreams.Publisher; +import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.ipc.netty.http.client.HttpClient; +import reactor.ipc.netty.http.client.HttpClientRequest; +import reactor.ipc.netty.http.client.HttpClientResponse; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.function.Function; + +/** + * + */ +public class ReactorNettyClient { + + private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024; + + private final UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromHttpUrl("http://localhost:15672/api"); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private final HttpClient client; + + private final String authorizationHeader; + + public ReactorNettyClient() { + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT); + objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION); + + client = HttpClient.create(options -> options.host(uriBuilder.build().getHost()).port(uriBuilder.build().getPort())); + + String credentials = "guest" + ":" + "guest"; + byte[] credentialsAsBytes = credentials.getBytes(StandardCharsets.ISO_8859_1); + byte[] encodedBytes = Base64.getEncoder().encode(credentialsAsBytes); + String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1); + authorizationHeader = "Basic " + encodedCredentials; + } + + static Function, Publisher> encode(ObjectMapper objectMapper, Object requestPayload) { + return outbound -> outbound + .flatMapMany(request -> { + try { + byte[] bytes = objectMapper.writeValueAsBytes(requestPayload); + + return request + .header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) + .header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(bytes.length)) + .sendByteArray(Mono.just(bytes)); + } catch (JsonProcessingException e) { + throw Exceptions.propagate(e); + } + }); + } + + public Mono getOverview() { + return client.get(uriBuilder.cloneBuilder().pathSegment("overview").toUriString(), request -> request + .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) + .send()).transform(decode(OverviewResponse.class)); + } + + public Flux getNodes() { + return client.get(uriBuilder.cloneBuilder().pathSegment("nodes").toUriString(), request -> request + .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) + .send()).transform(decode(NodeInfo[].class)).flatMapMany(nodes -> Flux.fromArray(nodes)); + } + + public Mono declarePolicy(String vhost, String name, PolicyInfo info) { + return client.put(uriBuilder.cloneBuilder() + .pathSegment("policies", "{vhost}", "{name}") + .build(vhost, name).toASCIIString(), request -> { + request.addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) + .chunkedTransfer(false) + .failOnClientError(false) + .failOnServerError(false); + + return Mono.just(request).transform(encode(objectMapper, info)); + }); + } + + public Flux getPolicies() { + return client.get(uriBuilder.cloneBuilder().pathSegment("policies").toUriString(), request -> request + .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) + .send()).transform(decode(PolicyInfo[].class)).flatMapMany(nodes -> Flux.fromArray(nodes)); + } + + public Mono deletePolicy(String vhost, String name) { + return client.delete(uriBuilder.cloneBuilder() + .pathSegment("policies", "{vhost}", "{name}") + .build(vhost, name).toASCIIString(), request -> request + .addHeader(HttpHeaderNames.AUTHORIZATION, authorizationHeader) + .send()); + } + + private Function, Flux> decode(Class type) { + return inbound -> + inbound.flatMapMany(response -> response.addHandler(new JsonObjectDecoder(MAX_PAYLOAD_SIZE)).receive().asByteArray() + .map(payload -> { + try { + return objectMapper.readValue(payload, type); + } catch (Throwable t) { + // FIXME exception handling + throw new RuntimeException(t); + } + }) + ); + } +} diff --git a/src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy b/src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy new file mode 100644 index 00000000..5b5dd710 --- /dev/null +++ b/src/test/groovy/com/rabbitmq/http/client/ReactorNettyClientSpec.groovy @@ -0,0 +1,165 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rabbitmq.http.client + +import com.rabbitmq.client.Connection +import com.rabbitmq.client.ConnectionFactory +import com.rabbitmq.http.client.domain.NodeInfo +import com.rabbitmq.http.client.domain.PolicyInfo +import spock.lang.Specification + +class ReactorNettyClientSpec extends Specification { + + protected ReactorNettyClient client + + private final ConnectionFactory cf = initializeConnectionFactory() + + protected static ConnectionFactory initializeConnectionFactory() { + final cf = new ConnectionFactory() + cf.setAutomaticRecoveryEnabled(false) + cf + } + + def setup() { + client = newLocalhostNodeClient() + } + + protected static ReactorNettyClient newLocalhostNodeClient() { + new ReactorNettyClient() + } + + def "GET /api/overview"() { + when: "client requests GET /api/overview" + final conn = openConnection() + final ch = conn.createChannel() + 1000.times { ch.basicPublish("", "", null, null) } + + def res = client.getOverview().block() + def xts = res.getExchangeTypes().collect { it.getName() } + + then: "the response is converted successfully" + res.getNode().startsWith("rabbit@") + res.getErlangVersion() != null + + final msgStats = res.getMessageStats() + msgStats.basicPublish >= 0 + msgStats.publisherConfirm >= 0 + msgStats.basicDeliver >= 0 + msgStats.basicReturn >= 0 + + final qTotals = res.getQueueTotals() + qTotals.messages >= 0 + qTotals.messagesReady >= 0 + qTotals.messagesUnacknowledged >= 0 + + final oTotals = res.getObjectTotals() + oTotals.connections >= 0 + oTotals.channels >= 0 + oTotals.exchanges >= 0 + oTotals.queues >= 0 + oTotals.consumers >= 0 + + res.listeners.size() >= 1 + res.contexts.size() >= 1 + + xts.contains("topic") + xts.contains("fanout") + xts.contains("direct") + xts.contains("headers") + + cleanup: + if (conn.isOpen()) { + conn.close() + } + } + + def "GET /api/nodes"() { + when: "client retrieves a list of cluster nodes" + final res = client.getNodes() + final node = res.blockFirst() + + then: "the list is returned" + res.count().block() >= 1 + verifyNode(node) + } + + def "GET /api/policies"() { + given: "at least one policy was declared" + final v = "/" + final s = "hop.test" + final d = new HashMap() + final p = ".*" + d.put("ha-mode", "all") + client.declarePolicy(v, s, new PolicyInfo(p, 0, null, d)).block() + + when: "client lists policies" + final xs = awaitEventPropagation({ client.getPolicies() }) + + then: "a list of policies is returned" + final x = xs.blockFirst() + verifyPolicyInfo(x) + + cleanup: + client.deletePolicy(v, s).block() + } + + protected Connection openConnection() { + this.cf.newConnection() + } + + protected static void verifyNode(NodeInfo node) { + assert node != null + assert node.name != null + assert node.socketsUsed <= node.socketsTotal + assert node.erlangProcessesUsed <= node.erlangProcessesTotal + assert node.erlangRunQueueLength >= 0 + assert node.memoryUsed <= node.memoryLimit + } + + protected static void verifyPolicyInfo(PolicyInfo x) { + assert x.name != null + assert x.vhost != null + assert x.pattern != null + assert x.definition != null + assert x.applyTo != null + } + + /** + * Statistics tables in the server are updated asynchronously, + * in particular starting with rabbitmq/rabbitmq-management#236, + * so in some cases we need to wait before GET'ing e.g. a newly opened connection. + */ + protected static Object awaitEventPropagation(Closure callback) { + if (callback) { + int n = 0 + def result = callback() + def hasElements = false + while (!hasElements && n < 10000) { + Thread.sleep(100) + n += 100 + result = callback() + hasElements = result?.hasElements().block() + } + assert n < 10000 + result + } else { + Thread.sleep(1000) + null + } + } + +}