-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create new async client based on Reactor Netty #123
Changes from 5 commits
fe4dc9a
f60bbbe
904f3a3
572c756
931dd01
885a409
5b880c8
35a3b29
2c685c7
4706c0c
2074451
9fa8644
8092e34
1ed4fe0
b3695fa
79046be
89100f0
5b28558
a0cd820
ef54e3e
cd5a4cd
b4db01e
8405d17
4ba9632
120b00b
fb5558a
52f4a49
95bea10
cd31689
99e6379
411b91c
49ac638
265925f
a1031ef
f31b4f2
0c1c63b
304e5b1
1910b03
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
/* | ||
* Copyright 2018 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.rabbitmq.http.client; | ||
|
||
import com.fasterxml.jackson.annotation.JsonInclude; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.DeserializationFeature; | ||
import com.fasterxml.jackson.databind.MapperFeature; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.rabbitmq.http.client.domain.NodeInfo; | ||
import com.rabbitmq.http.client.domain.OverviewResponse; | ||
import com.rabbitmq.http.client.domain.PolicyInfo; | ||
import io.netty.handler.codec.http.HttpHeaderNames; | ||
import io.netty.handler.codec.http.HttpHeaderValues; | ||
import io.netty.handler.codec.json.JsonObjectDecoder; | ||
import org.reactivestreams.Publisher; | ||
import org.springframework.util.StringUtils; | ||
import reactor.core.Exceptions; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.ipc.netty.http.client.HttpClient; | ||
import reactor.ipc.netty.http.client.HttpClientRequest; | ||
import reactor.ipc.netty.http.client.HttpClientResponse; | ||
|
||
import java.io.UnsupportedEncodingException; | ||
import java.lang.reflect.Array; | ||
import java.net.URI; | ||
import java.net.URLEncoder; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Base64; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* | ||
*/ | ||
public class ReactorNettyClient { | ||
|
||
private static final int MAX_PAYLOAD_SIZE = 100 * 1024 * 1024; | ||
|
||
private static final String ENCODING_CHARSET = "UTF-8"; | ||
|
||
private final String rootUrl; | ||
|
||
private final ObjectMapper objectMapper = new ObjectMapper(); | ||
|
||
private final HttpClient client; | ||
|
||
private final Mono<String> token; | ||
|
||
public ReactorNettyClient(String url) { | ||
this(urlWithoutCredentials(url), | ||
StringUtils.split(URI.create(url).getUserInfo(), ":")[0], | ||
StringUtils.split(URI.create(url).getUserInfo(), ":")[1]); | ||
} | ||
|
||
public ReactorNettyClient(String url, String username, String password) { | ||
rootUrl = url; | ||
// FIXME make Jackson ObjectMapper configurable | ||
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); | ||
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT); | ||
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); | ||
objectMapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION); | ||
|
||
// FIXME make URL configurable | ||
URI uri = URI.create(url); | ||
client = HttpClient.create(options -> options.host(uri.getHost()).port(uri.getPort())); | ||
|
||
// FIXME make Authentication header value configurable (default being Basic) | ||
this.token = createBasicAuthenticationToken(username, password); | ||
|
||
// FIXME make SSLContext configurable when using TLS | ||
} | ||
|
||
private static String urlWithoutCredentials(String url) { | ||
URI url1 = URI.create(url); | ||
return StringUtils.replace(url, url1.getUserInfo() + "@", ""); | ||
} | ||
|
||
protected Mono<String> createBasicAuthenticationToken(String username, String password) { | ||
return Mono.fromSupplier(() -> { | ||
String credentials = username + ":" + password; | ||
byte[] credentialsAsBytes = credentials.getBytes(StandardCharsets.ISO_8859_1); | ||
byte[] encodedBytes = Base64.getEncoder().encode(credentialsAsBytes); | ||
String encodedCredentials = new String(encodedBytes, StandardCharsets.ISO_8859_1); | ||
return "Basic " + encodedCredentials; | ||
}).cache(); | ||
} | ||
|
||
public Mono<OverviewResponse> getOverview() { | ||
return doGetMono(OverviewResponse.class, "overview"); | ||
} | ||
|
||
public Flux<NodeInfo> getNodes() { | ||
return doGetFlux(NodeInfo.class, "nodes"); | ||
} | ||
|
||
public Mono<HttpClientResponse> declarePolicy(String vhost, String name, PolicyInfo info) { | ||
return doPost(info, "policies", vhost, name); | ||
} | ||
|
||
private HttpClientRequest disableChunkTransfer(HttpClientRequest request) { | ||
return request.chunkedTransfer(false); | ||
} | ||
|
||
public Flux<PolicyInfo> getPolicies() { | ||
return doGetFlux(PolicyInfo.class, "policies"); | ||
} | ||
|
||
public Mono<HttpClientResponse> deletePolicy(String vhost, String name) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does sounds like Thanks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Either that or a boolean (whether the delete succeeded). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. This applies to empty responses (POST, PUT, and DELETE). Different options:
I'd go with the wrapper, with only the status code and headers, we could make it more elaborate on demand. |
||
return doDelete("policies", vhost, name); | ||
} | ||
|
||
private <T> Mono<T> doGetMono(Class<T> type, String... pathSegments) { | ||
return client.get(uri(pathSegments), request -> Mono.just(request) | ||
.transform(this::addAuthorization) | ||
.flatMap(pRequest -> pRequest.send())).transform(decode(type)); | ||
} | ||
|
||
private <T> Flux<T> doGetFlux(Class<T> type, String... pathSegments) { | ||
return (Flux<T>) doGetMono(Array.newInstance(type, 0).getClass(), pathSegments).flatMapMany(items -> Flux.fromArray((Object[]) items)); | ||
} | ||
|
||
private Mono<HttpClientResponse> doPost(Object body, String... pathSegments) { | ||
return client.put(uri(pathSegments), request -> Mono.just(request) | ||
.transform(this::addAuthorization) | ||
.map(this::disableChunkTransfer) | ||
.transform(encode(body))); | ||
} | ||
|
||
private Mono<HttpClientResponse> doDelete(String... pathSegments) { | ||
return client.delete(uri(pathSegments), request -> Mono.just(request) | ||
.transform(this::addAuthorization) | ||
.flatMap(HttpClientRequest::send) | ||
); | ||
} | ||
|
||
private Mono<HttpClientRequest> addAuthorization(Mono<HttpClientRequest> request) { | ||
return Mono | ||
.zip(request, token) | ||
.map(tuple -> tuple.getT1().addHeader(HttpHeaderNames.AUTHORIZATION, tuple.getT2())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess the
would enough and much efficient. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh! I see what you mean with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, but you use Am I missing anything else ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you're right, the current default implementation is static and may be the one used 99% of the time. I plan to add a way to provide a custom There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good. It is now. Thank you! |
||
} | ||
|
||
private String uri(String... pathSegments) { | ||
StringBuilder builder = new StringBuilder(); | ||
if (pathSegments != null && pathSegments.length > 0) { | ||
for (String pathSegment : pathSegments) { | ||
try { | ||
builder.append("/"); | ||
builder.append(URLEncoder.encode(pathSegment, ENCODING_CHARSET)); | ||
} catch (UnsupportedEncodingException e) { | ||
// FIXME exception handling | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} | ||
return rootUrl + builder.toString(); | ||
} | ||
|
||
private <T> Function<Mono<HttpClientResponse>, Flux<T>> decode(Class<T> type) { | ||
return inbound -> | ||
inbound.flatMapMany(response -> response.addHandler(new JsonObjectDecoder(MAX_PAYLOAD_SIZE)).receive().asByteArray() | ||
.map(payload -> { | ||
try { | ||
return objectMapper.readValue(payload, type); | ||
} catch (Throwable t) { | ||
// FIXME exception handling | ||
throw new RuntimeException(t); | ||
} | ||
}) | ||
); | ||
} | ||
|
||
private Function<Mono<HttpClientRequest>, Publisher<Void>> encode(Object requestPayload) { | ||
return outbound -> outbound | ||
.flatMapMany(request -> { | ||
try { | ||
byte[] bytes = objectMapper.writeValueAsBytes(requestPayload); | ||
|
||
return request | ||
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) | ||
.header(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(bytes.length)) | ||
.sendByteArray(Mono.just(bytes)); | ||
} catch (JsonProcessingException e) { | ||
throw Exceptions.propagate(e); | ||
} | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
/* | ||
* Copyright 2018 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.rabbitmq.http.client | ||
|
||
import com.rabbitmq.client.Connection | ||
import com.rabbitmq.client.ConnectionFactory | ||
import com.rabbitmq.http.client.domain.NodeInfo | ||
import com.rabbitmq.http.client.domain.PolicyInfo | ||
import spock.lang.Specification | ||
|
||
class ReactorNettyClientSpec extends Specification { | ||
|
||
protected ReactorNettyClient client | ||
|
||
private final ConnectionFactory cf = initializeConnectionFactory() | ||
|
||
protected static ConnectionFactory initializeConnectionFactory() { | ||
final cf = new ConnectionFactory() | ||
cf.setAutomaticRecoveryEnabled(false) | ||
cf | ||
} | ||
|
||
def setup() { | ||
client = newLocalhostNodeClient() | ||
} | ||
|
||
protected static ReactorNettyClient newLocalhostNodeClient() { | ||
new ReactorNettyClient("http://guest:guest@localhost:15672/api") | ||
} | ||
|
||
def "GET /api/overview"() { | ||
when: "client requests GET /api/overview" | ||
final conn = openConnection() | ||
final ch = conn.createChannel() | ||
1000.times { ch.basicPublish("", "", null, null) } | ||
|
||
def res = client.getOverview().block() | ||
def xts = res.getExchangeTypes().collect { it.getName() } | ||
|
||
then: "the response is converted successfully" | ||
res.getNode().startsWith("rabbit@") | ||
res.getErlangVersion() != null | ||
|
||
final msgStats = res.getMessageStats() | ||
msgStats.basicPublish >= 0 | ||
msgStats.publisherConfirm >= 0 | ||
msgStats.basicDeliver >= 0 | ||
msgStats.basicReturn >= 0 | ||
|
||
final qTotals = res.getQueueTotals() | ||
qTotals.messages >= 0 | ||
qTotals.messagesReady >= 0 | ||
qTotals.messagesUnacknowledged >= 0 | ||
|
||
final oTotals = res.getObjectTotals() | ||
oTotals.connections >= 0 | ||
oTotals.channels >= 0 | ||
oTotals.exchanges >= 0 | ||
oTotals.queues >= 0 | ||
oTotals.consumers >= 0 | ||
|
||
res.listeners.size() >= 1 | ||
res.contexts.size() >= 1 | ||
|
||
xts.contains("topic") | ||
xts.contains("fanout") | ||
xts.contains("direct") | ||
xts.contains("headers") | ||
|
||
cleanup: | ||
if (conn.isOpen()) { | ||
conn.close() | ||
} | ||
} | ||
|
||
def "GET /api/nodes"() { | ||
when: "client retrieves a list of cluster nodes" | ||
final res = client.getNodes() | ||
final node = res.blockFirst() | ||
|
||
then: "the list is returned" | ||
res.count().block() >= 1 | ||
verifyNode(node) | ||
} | ||
|
||
def "GET /api/policies"() { | ||
given: "at least one policy was declared" | ||
final v = "/" | ||
final s = "hop.test" | ||
final d = new HashMap<String, Object>() | ||
final p = ".*" | ||
d.put("ha-mode", "all") | ||
client.declarePolicy(v, s, new PolicyInfo(p, 0, null, d)).block() | ||
|
||
when: "client lists policies" | ||
final xs = awaitEventPropagation({ client.getPolicies() }) | ||
|
||
then: "a list of policies is returned" | ||
final x = xs.blockFirst() | ||
verifyPolicyInfo(x) | ||
|
||
cleanup: | ||
client.deletePolicy(v, s).block() | ||
} | ||
|
||
protected Connection openConnection() { | ||
this.cf.newConnection() | ||
} | ||
|
||
protected static void verifyNode(NodeInfo node) { | ||
assert node != null | ||
assert node.name != null | ||
assert node.socketsUsed <= node.socketsTotal | ||
assert node.erlangProcessesUsed <= node.erlangProcessesTotal | ||
assert node.erlangRunQueueLength >= 0 | ||
assert node.memoryUsed <= node.memoryLimit | ||
} | ||
|
||
protected static void verifyPolicyInfo(PolicyInfo x) { | ||
assert x.name != null | ||
assert x.vhost != null | ||
assert x.pattern != null | ||
assert x.definition != null | ||
assert x.applyTo != null | ||
} | ||
|
||
/** | ||
* Statistics tables in the server are updated asynchronously, | ||
* in particular starting with rabbitmq/rabbitmq-management#236, | ||
* so in some cases we need to wait before GET'ing e.g. a newly opened connection. | ||
*/ | ||
protected static Object awaitEventPropagation(Closure callback) { | ||
if (callback) { | ||
int n = 0 | ||
def result = callback() | ||
def hasElements = false | ||
while (!hasElements && n < 10000) { | ||
Thread.sleep(100) | ||
n += 100 | ||
result = callback() | ||
hasElements = result?.hasElements().block() | ||
} | ||
assert n < 10000 | ||
result | ||
} else { | ||
Thread.sleep(1000) | ||
null | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, sorry for coming me back again:, but my thought was that you are going to get rid of any Spring dependencies at all. No? Am I to nit-picking?
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, we still want to get rid of Spring dependencies for this client, I'll use something there. Thanks!