Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #7

Merged
merged 3 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ Maven
<dependency>
<groupId>com.github.saasquatch</groupId>
<artifactId>apache-client5-reactive</artifactId>
<version>0.0.5</version>
<version>0.0.6</version>
</dependency>
```

Gradle

```gradle
implementation 'com.github.saasquatch:apache-client5-reactive:0.0.5'
implementation 'com.github.saasquatch:apache-client5-reactive:0.0.6'
```

## License
Expand Down
15 changes: 8 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.saasquatch</groupId>
<artifactId>apache-client5-reactive</artifactId>
<version>0.0.5-SNAPSHOT</version>
<version>0.0.6-SNAPSHOT</version>
<packaging>jar</packaging>

<name>apache-client5-reactive</name>
Expand All @@ -19,7 +20,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>5.8.1</junit.version>
<junit.version>5.9.1</junit.version>
</properties>

<dependencies>
Expand All @@ -38,17 +39,17 @@
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.1.2</version>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-reactive</artifactId>
<version>5.1.1</version>
<version>5.2</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.2</version>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand All @@ -58,7 +59,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.32</version>
<version>2.0.5</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.saasquatch.client5reactive;

import io.reactivex.rxjava3.core.MaybeEmitter;
import java.util.concurrent.CancellationException;
import org.apache.hc.core5.concurrent.FutureCallback;
import io.reactivex.rxjava3.core.MaybeEmitter;

/**
* Utilities for {@link FutureCallback}s. Not public.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,44 @@
/**
* Thin wrapper around Apache {@link HttpAsyncClient} to expose
* <a href="https://www.reactive-streams.org/">Reactive Streams</a> interfaces.<br>
* The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and {@link
* CloseableHttpAsyncClient}.
* The methods in this interface aim to mirror the ones in {@link HttpAsyncClient} and
* {@link CloseableHttpAsyncClient}.
*
* @author sli
* @see HttpReactiveClients
*/
public interface HttpReactiveClient {

/**
* Execute the given request. This method is equivalent to {@link HttpAsyncClient#execute(AsyncRequestProducer,
* AsyncResponseConsumer, HandlerFactory, HttpContext, FutureCallback)}. If the {@link Future}
* produced by the equivalent {@link HttpAsyncClient} method completes with {@code null}, then the
* returning {@link Publisher} of this method will complete with no element.
* Execute the given request. This method is equivalent to
* {@link HttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory,
* HttpContext, FutureCallback)}. If the {@link Future} produced by the equivalent
* {@link HttpAsyncClient} method completes with {@code null}, then the returning
* {@link Publisher} of this method will complete with no element.
*/
<T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,
@Nonnull AsyncResponseConsumer<T> responseConsumer,
@Nullable HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@Nullable HttpContext context);

/**
* Convenience method for {@link #execute(AsyncRequestProducer, AsyncResponseConsumer,
* HandlerFactory, HttpContext)}, equivalent to {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer,
* AsyncResponseConsumer, HttpContext, FutureCallback)}
* Convenience method for
* {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)},
* equivalent to
* {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer,
* HttpContext, FutureCallback)}
*/
default <T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,
@Nonnull AsyncResponseConsumer<T> responseConsumer, @Nullable HttpContext context) {
return execute(requestProducer, responseConsumer, null, context);
}

/**
* Convenience method for {@link #execute(AsyncRequestProducer, AsyncResponseConsumer,
* HandlerFactory, HttpContext)}, equivalent to {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer,
* AsyncResponseConsumer, FutureCallback)}.
* Convenience method for
* {@link #execute(AsyncRequestProducer, AsyncResponseConsumer, HandlerFactory, HttpContext)},
* equivalent to
* {@link CloseableHttpAsyncClient#execute(AsyncRequestProducer, AsyncResponseConsumer,
* FutureCallback)}.
*/
default <T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,
@Nonnull AsyncResponseConsumer<T> responseConsumer) {
Expand All @@ -64,62 +69,63 @@ default <T> Publisher<T> execute(@Nonnull AsyncRequestProducer requestProducer,

/**
* Execute a simple in-memory request and get a simple in-memory response. This method is
* equivalent to {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext,
* FutureCallback)}. The returning {@link Publisher} completes with exactly 1 element.
* equivalent to
* {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, HttpContext, FutureCallback)}. The
* returning {@link Publisher} completes with exactly 1 element.
*/
default Publisher<SimpleHttpResponse> execute(@Nonnull SimpleHttpRequest request,
@Nullable HttpContext context) {
return execute(SimpleRequestProducer.create(request), SimpleResponseConsumer.create(), context);
}

/**
* Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to {@link
* CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}.
* Convenience method for {@link #execute(SimpleHttpRequest, HttpContext)}, equivalent to
* {@link CloseableHttpAsyncClient#execute(SimpleHttpRequest, FutureCallback)}.
*/
default Publisher<SimpleHttpResponse> execute(@Nonnull SimpleHttpRequest request) {
return execute(request, null);
}

/**
* Execute the given request and get a streaming response body as a {@link Publisher} of {@link
* ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The {@link
* Publisher} within the returning {@link Publisher} may contain 0 to n elements.
* Execute the given request and get a streaming response body as a {@link Publisher} of
* {@link ByteBuffer}s. The returning {@link Publisher} completes with exactly 1 element. The
* {@link Publisher} within the returning {@link Publisher} may contain 0 to n elements.
*/
Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull AsyncRequestProducer requestProducer,
@Nullable HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@Nullable HttpContext context);

/**
* Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory,
* HttpContext)}
* Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull AsyncRequestProducer requestProducer, @Nullable HttpContext context) {
return streamingExecute(requestProducer, null, context);
}

/**
* Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory,
* HttpContext)}
* Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull AsyncRequestProducer requestProducer) {
return streamingExecute(requestProducer, null);
}

/**
* Execute a simple in-memory request and get a streaming response. Convenience method for {@link
* #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
* Execute a simple in-memory request and get a streaming response. Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull SimpleHttpRequest request, @Nullable HttpContext context) {
return streamingExecute(SimpleRequestProducer.create(request), context);
}

/**
* Convenience method for {@link #streamingExecute(AsyncRequestProducer, HandlerFactory,
* HttpContext)}
* Convenience method for
* {@link #streamingExecute(AsyncRequestProducer, HandlerFactory, HttpContext)}
*/
default Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamingExecute(
@Nonnull SimpleHttpRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.saasquatch.client5reactive;

import io.reactivex.rxjava3.core.Maybe;
import java.nio.ByteBuffer;
import java.util.Objects;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.hc.client5.http.async.HttpAsyncClient;
import org.apache.hc.core5.http.HttpResponse;
Expand All @@ -13,9 +15,6 @@
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.Maybe;

import javax.annotation.Nonnull;

/**
* Concrete implementation of {@link HttpReactiveClient}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ private HttpReactiveClients() {}
/**
* Create a {@link HttpReactiveClient} from a given {@link HttpAsyncClient}. Note that the created
* {@link HttpReactiveClient} is simply a wrapper of the {@link HttpAsyncClient} and does not
* support state management, so you'll need to manage the state of the given {@link
* HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()}, {@link
* CloseableHttpAsyncClient#close()}, etc.
* support state management, so you'll need to manage the state of the given
* {@link HttpAsyncClient} yourself by calling {@link CloseableHttpAsyncClient#start()},
* {@link CloseableHttpAsyncClient#close()}, etc.
*/
@Nonnull
public static HttpReactiveClient create(@Nonnull HttpAsyncClient httpAsyncClient) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.saasquatch.client5reactive;

import io.reactivex.rxjava3.core.Maybe;
import java.io.IOException;
import java.util.concurrent.CancellationException;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.junit.jupiter.api.Test;
import io.reactivex.rxjava3.core.Maybe;

public class FutureCallbacksTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static void main(String[] args) throws Exception {
final HttpReactiveClient reactiveClient = HttpReactiveClients.create(asyncClient);
// Execute a simple in-memory request
Single.fromPublisher(
reactiveClient.execute(SimpleRequestBuilder.get("https://www.example.com").build()))
reactiveClient.execute(SimpleRequestBuilder.get("https://www.example.com").build()))
.doOnSuccess(response -> {
// Get the response status and body in memory
System.out.println(response.getCode());
Expand All @@ -35,7 +35,7 @@ public static void main(String[] args) throws Exception {
// Execute a streaming request
// In this case, the request is a simple in-memory request without a request body
Single.fromPublisher(reactiveClient.streamingExecute(
SimpleRequestBuilder.get("https://www.example.com").build()))
SimpleRequestBuilder.get("https://www.example.com").build()))
.flatMapPublisher(message -> {
// Get the status before subscribing to the streaming body
System.out.println(message.getHead().getCode());
Expand Down