Skip to content

Commit

Permalink
Added example and test for reative async api.
Browse files Browse the repository at this point in the history
Updated maven dep for reactive http client and rxjava
Code clean up.
  • Loading branch information
sagaofsilence committed Apr 14, 2024
1 parent a8afcc7 commit 94b2ad6
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 0 deletions.
17 changes: 17 additions & 0 deletions apache-http-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,27 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5-cache</artifactId>
<version>${apache-http-client.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents.core5/httpcore5-reactive -->
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-reactive</artifactId>
<version>5.2.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava3/rxjava -->
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.8</version>
</dependency>

<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
Expand All @@ -49,6 +65,7 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package io.refactoring.http5.client.example.async.helper;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.concurrent.*;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.async.MinimalH2AsyncClient;
import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.*;
import org.apache.hc.core5.http.config.Http1Config;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;

@Slf4j
public class AsyncClientH2Multiplexing {

public static void main(final String[] args) throws Exception {

final IOReactorConfig ioReactorConfig =
IOReactorConfig.custom().setSoTimeout(Timeout.ofSeconds(5)).build();

final CloseableHttpAsyncClient client =
HttpAsyncClients.custom().setIOReactorConfig(ioReactorConfig).setH2Config(H2Config.DEFAULT).build();

client.start();

final HttpHost target = new HttpHost("nghttp2.org");
final String requestUri = "/httpbin/post";

final AsyncRequestProducer requestProducer =
new BasicRequestProducer(
Method.POST,
target,
requestUri,
AsyncEntityProducers.create("stuff", ContentType.TEXT_PLAIN));
System.out.println("Executing POST request to " + requestUri);
final Future<Message<HttpResponse, String>> future =
client.execute(
requestProducer,
new BasicResponseConsumer<String>(new StringAsyncEntityConsumer()),
new FutureCallback<Message<HttpResponse, String>>() {

@Override
public void completed(final Message<HttpResponse, String> message) {
System.out.println(requestUri + "->" + message.getHead().getCode());
System.out.println(message.getBody());
}

@Override
public void failed(final Exception ex) {
System.out.println(requestUri + "->" + ex);
}

@Override
public void cancelled() {
System.out.println(requestUri + " cancelled");
}
});
future.get();

System.out.println("Shutting down");
client.close(CloseMode.GRACEFUL);
}

private static void test1()
throws NoSuchAlgorithmException,
KeyManagementException,
KeyStoreException,
InterruptedException,
ExecutionException,
TimeoutException {
MinimalH2AsyncClient minimalH2AsyncClient =
HttpAsyncClients.createHttp2Minimal(
H2Config.DEFAULT, IOReactorConfig.DEFAULT, getTlsStrategy());

final MinimalHttpAsyncClient client =
HttpAsyncClients.createMinimal(
H2Config.DEFAULT,
Http1Config.DEFAULT,
IOReactorConfig.DEFAULT,
PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(getTlsStrategy())
.setDefaultTlsConfig(
TlsConfig.custom().setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2).build())
.build());

client.start();

final HttpHost target = new HttpHost("https", "nghttp2.org");
final Future<AsyncClientEndpoint> leaseFuture = client.lease(target, null);
final AsyncClientEndpoint endpoint = leaseFuture.get(30, TimeUnit.SECONDS);
try {
final String[] requestUris =
new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};

final CountDownLatch latch = new CountDownLatch(requestUris.length);
for (final String requestUri : requestUris) {
final SimpleHttpRequest request =
SimpleRequestBuilder.get().setHttpHost(target).setPath(requestUri).build();

System.out.println("Executing request " + request);
endpoint.execute(
SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(),
new FutureCallback<SimpleHttpResponse>() {

@Override
public void completed(final SimpleHttpResponse response) {
latch.countDown();
System.out.println(request + "->" + new StatusLine(response));
System.out.println(response.getBodyText());
}

@Override
public void failed(final Exception ex) {
latch.countDown();
System.out.println(request + "->" + ex);
}

@Override
public void cancelled() {
latch.countDown();
System.out.println(request + " cancelled");
}
});
}
latch.await();
} finally {
endpoint.releaseAndReuse();
}

System.out.println("Shutting down");
client.close(CloseMode.GRACEFUL);
}

private static TlsStrategy getTlsStrategy()
throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException {
// Trust standard CA and those trusted by our custom strategy
final SSLContext sslContext =
SSLContexts.custom()
// Custom TrustStrategy implementations are intended for verification
// of certificates whose CA is not trusted by the system, and where specifying
// a custom truststore containing the certificate chain is not an option.
.loadTrustMaterial(
(chain, authType) -> {
// Please note that validation of the server certificate without validation
// of the entire certificate chain in this example is preferred to completely
// disabling trust verification, however, this still potentially allows
// for man-in-the-middle attacks.
final X509Certificate cert = chain[0];
log.warn(
"Bypassing SSL certificate validation for {}",
cert.getSubjectX500Principal().getName());
return true;
})
.build();

return ClientTlsStrategyBuilder.create().setSslContext(sslContext).build();
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package io.refactoring.http5.client.example.async.helper;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.refactoring.http5.client.example.RequestProcessingException;
import io.refactoring.http5.client.example.config.interceptor.UserResponseAsyncExecChainHandler;
import io.refactoring.http5.client.example.helper.BaseHttpRequestHelper;
import io.refactoring.http5.client.example.model.User;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -32,9 +38,12 @@
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.reactive.ReactiveEntityProducer;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;
import org.reactivestreams.Publisher;

/** Handles HTTP requests for user entities. It uses built in types for HTTP processing. */
@Slf4j
Expand Down Expand Up @@ -508,4 +517,75 @@ public Map<Integer, String> executeRequestsWithInterceptors(

return userResponseMap;
}

/**
* Creates user with reactive processing.
*
* @param minimalHttpClient the minimal http client
* @param userName the user name
* @param userJob the user job
* @param scheme the scheme
* @param hostname the hostname
* @return the user with reactive processing
* @throws RequestProcessingException the request processing exception
*/
public User createUserWithReactiveProcessing(
final MinimalHttpAsyncClient minimalHttpClient,
final String userName,
final String userJob,
String scheme,
String hostname)
throws RequestProcessingException {
try {
final HttpHost httpHost = new HttpHost(scheme, hostname);
final URI uri = new URIBuilder(httpHost.toURI() + "/api/users/").build();

final Map<String, String> payload = new HashMap<>();
payload.put("name", userName);
payload.put("job", userJob);
ObjectMapper objectMapper = new ObjectMapper();
final String payloadStr = objectMapper.writeValueAsString(payload);
final byte[] bs = payloadStr.getBytes(StandardCharsets.UTF_8);

final ReactiveEntityProducer reactiveEntityProducer =
new ReactiveEntityProducer(
Flowable.just(ByteBuffer.wrap(bs)), bs.length, ContentType.TEXT_PLAIN, null);

final BasicRequestProducer requestProducer =
new BasicRequestProducer("POST", uri, reactiveEntityProducer);

final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();

final Future<Void> requestFuture = minimalHttpClient.execute(requestProducer, consumer, null);

final Message<HttpResponse, Publisher<ByteBuffer>> streamingResponse =
consumer.getResponseFuture().get();
log.debug("Head: {}", streamingResponse.getHead());
for (final Header header : streamingResponse.getHead().getHeaders()) {
log.debug("Header : {}", header);
}

StringBuilder result = new StringBuilder();
Observable.fromPublisher(streamingResponse.getBody())
.map(
byteBuffer -> {
final byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return new String(bytes);
})
.materialize()
.forEach(
stringNotification -> {
final String value = stringNotification.getValue();
if (value != null) {
result.append(value);
}
});

requestFuture.get(1, TimeUnit.MINUTES);
return objectMapper.readerFor(User.class).readValue(result.toString());
} catch (Exception e) {
throw new RequestProcessingException("Failed to create user. Error: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.refactoring.http5.client.example.model.User;
import java.util.List;
import java.util.Map;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
Expand Down Expand Up @@ -165,4 +166,24 @@ void getUserWithInterceptors() {
Assertions.fail("Failed to execute HTTP request.", e);
}
}

@Test
void createUserWithReactiveProcessing() {
MinimalHttpAsyncClient minimalHttpAsyncClient = null;
try {
minimalHttpAsyncClient = userHttpRequestHelper.startMinimalHttp1AsyncClient();

final User responseBody =
userHttpRequestHelper.createUserWithReactiveProcessing(
minimalHttpAsyncClient, "RxMan", "Manager", "https", "reqres.in");

// verify
assertThat(responseBody).extracting("id", "createdAt").isNotNull();

} catch (Exception e) {
Assertions.fail("Failed to execute HTTP request.", e);
} finally {
userHttpRequestHelper.stopMinimalHttpAsyncClient(minimalHttpAsyncClient);
}
}
}

0 comments on commit 94b2ad6

Please sign in to comment.