Skip to content

Commit

Permalink
feat: add Reactor support
Browse files Browse the repository at this point in the history
  • Loading branch information
aoudiamoncef committed Apr 22, 2021
1 parent f72c3af commit 9d4f795
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 0 deletions.
17 changes: 17 additions & 0 deletions apollo-reactor-support/api.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Signature format: 3.0
package com.apollographql.apollo.reactor {

public final class KotlinExtensions {
}

public class ReactorApollo {
method public static <T> reactor.core.publisher.Mono<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloQueryWatcher<T!>);
method public static <T> reactor.core.publisher.Mono<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloCall<T!>);
method public static reactor.core.publisher.Mono<java.lang.Void> from(com.apollographql.apollo.ApolloPrefetch);
method public static <T> reactor.core.publisher.Flux<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloSubscriptionCall<T!>);
method public static <T> reactor.core.publisher.Flux<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloSubscriptionCall<T!>, reactor.core.publisher.FluxSink.OverflowStrategy);
method public static <T> reactor.core.publisher.Mono<T!> from(com.apollographql.apollo.cache.normalized.ApolloStoreOperation<T!>);
}

}

15 changes: 15 additions & 0 deletions apollo-reactor-support/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
plugins {
`java-library`
kotlin("jvm")
}

dependencies {
add("implementation", project(":apollo-api"))
add("api", groovy.util.Eval.x(project, "x.dep.reactor.core"))
add("api", project(":apollo-runtime"))
}

tasks.withType<Javadoc> {
options.encoding = "UTF-8"
}

4 changes: 4 additions & 0 deletions apollo-reactor-support/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
POM_ARTIFACT_ID=apollo-reactor-support
POM_NAME=Apollo GraphQL Reactor Support
POM_DESCRIPTION=Apollo GraphQL Reactor bindings
POM_PACKAGING=jar
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package com.apollographql.apollo.reactor;

import com.apollographql.apollo.ApolloCall;
import com.apollographql.apollo.ApolloPrefetch;
import com.apollographql.apollo.ApolloQueryWatcher;
import com.apollographql.apollo.ApolloSubscriptionCall;
import com.apollographql.apollo.api.Response;
import com.apollographql.apollo.cache.normalized.ApolloStoreOperation;
import com.apollographql.apollo.exception.ApolloException;
import com.apollographql.apollo.internal.subscription.ApolloSubscriptionTerminatedException;
import com.apollographql.apollo.internal.util.Cancelable;
import org.jetbrains.annotations.NotNull;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

import static com.apollographql.apollo.api.internal.Utils.checkNotNull;

/**
* The ReactorApollo class provides methods for converting ApolloCall, ApolloPrefetch and ApolloWatcher types to Reactor sources.
*/
public class ReactorApollo {

private ReactorApollo() {
throw new AssertionError("This class cannot be instantiated");
}

/**
* Converts an {@link ApolloQueryWatcher} to an asynchronous Mono.
*
* @param watcher the ApolloQueryWatcher to convert.
* @param <T> the value type
* @return the converted Mono
* @throws NullPointerException if watcher == null
*/
@NotNull
public static <T> Mono<Response<T>> from(@NotNull final ApolloQueryWatcher<T> watcher) {
checkNotNull(watcher, "watcher == null");
return Mono.create(sink -> {
ApolloQueryWatcher<T> clone = watcher.clone();
cancelOnMonoDisposed(sink, clone);
clone.enqueueAndWatch(new ApolloCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
sink.success(response);
}

@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
sink.error(e);
}
});
});
}

/**
* Converts an {@link ApolloCall} to an {@link Mono}. The number ofemissions this Mono will have is based on the {@link
* com.apollographql.apollo.fetcher.ResponseFetcher} used with the call.
*
* @param call the ApolloCall to convert
* @param <T> the value type.
* @return the converted Mono
* @throws NullPointerException if originalCall == null
*/
@NotNull
public static <T> Mono<Response<T>> from(@NotNull final ApolloCall<T> call) {
checkNotNull(call, "call == null");
return Mono.create(sink -> {
ApolloCall<T> clone = call.toBuilder().build();
cancelOnMonoDisposed(sink, clone);
clone.enqueue(new ApolloCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
sink.success(response);
}

@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
sink.error(e);
}

@Override public void onStatusEvent(@NotNull ApolloCall.StatusEvent event) {
if (event == ApolloCall.StatusEvent.COMPLETED) {
sink.success();
}
}
});
});
}

/**
* Converts an {@link ApolloPrefetch} to a synchronous Mono<Void>
*
* @param prefetch the ApolloPrefetch to convert
* @return the converted Mono<Void>
* @throws NullPointerException if prefetch == null
*/
@NotNull
public static Mono<Void> from(@NotNull final ApolloPrefetch prefetch) {
checkNotNull(prefetch, "prefetch == null");
return Mono.create(sink -> {
ApolloPrefetch clone = prefetch.clone();
cancelOnMonoDisposed(sink, clone);
clone.enqueue(new ApolloPrefetch.Callback() {
@Override public void onSuccess() {
sink.success();
}

@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
sink.error(e);
}
});
});
}

@NotNull
public static <T> Flux<Response<T>> from(@NotNull ApolloSubscriptionCall<T> call) {
return from(call, FluxSink.OverflowStrategy.LATEST);
}

@NotNull
public static <T> Flux<Response<T>> from(@NotNull final ApolloSubscriptionCall<T> call,
@NotNull FluxSink.OverflowStrategy backpressureStrategy) {
checkNotNull(call, "originalCall == null");
checkNotNull(backpressureStrategy, "backpressureStrategy == null");
return Flux.create(sink -> {
ApolloSubscriptionCall<T> clone = call.clone();
cancelOnFluxDisposed(sink, clone);
clone.execute(
new ApolloSubscriptionCall.Callback<T>() {
@Override public void onResponse(@NotNull Response<T> response) {
if (!sink.isCancelled()) {
sink.next(response);
}
}

@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!sink.isCancelled()) {
sink.error(e);
}
}

@Override public void onCompleted() {
if (!sink.isCancelled()) {
sink.complete();
}
}

@Override public void onTerminated() {
onFailure(new ApolloSubscriptionTerminatedException("Subscription server unexpectedly terminated "
+ "connection"));
}

@Override public void onConnected() {
}
}
);
}, backpressureStrategy);
}

/**
* Converts an {@link ApolloStoreOperation} to a Mono.
*
* @param operation the ApolloStoreOperation to convert
* @param <T> the value type
* @return the converted Mono
*/
@NotNull
public static <T> Mono<T> from(@NotNull final ApolloStoreOperation<T> operation) {
checkNotNull(operation, "operation == null");
return Mono.create(sink -> {
operation.enqueue(new ApolloStoreOperation.Callback<T>() {
@Override
public void onSuccess(T result) {
sink.success(result);
}

@Override
public void onFailure(@NotNull Throwable t) {
sink.error(t);
}
});
});

}

private static <T> void cancelOnMonoDisposed(MonoSink<T> sink, final Cancelable cancelable) {
sink.onCancel(getReactorDisposable(cancelable));
sink.onDispose(getReactorDisposable(cancelable));
}

private static <T> void cancelOnFluxDisposed(FluxSink<T> sink, final Cancelable cancelable) {
sink.onCancel(getReactorDisposable(cancelable));
sink.onDispose(getReactorDisposable(cancelable));
}

private static Disposable getReactorDisposable(final Cancelable cancelable) {
return new Disposable() {
@Override public void dispose() {
cancelable.cancel();
}

@Override public boolean isDisposed() {
return cancelable.isCanceled();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
@file:Suppress("NOTHING_TO_INLINE")
@file:JvmName("KotlinExtensions")

package com.apollographql.apollo.reactor

import com.apollographql.apollo.ApolloCall
import com.apollographql.apollo.ApolloClient
import com.apollographql.apollo.ApolloMutationCall
import com.apollographql.apollo.ApolloPrefetch
import com.apollographql.apollo.ApolloQueryCall
import com.apollographql.apollo.ApolloQueryWatcher
import com.apollographql.apollo.ApolloSubscriptionCall
import com.apollographql.apollo.api.Mutation
import com.apollographql.apollo.api.Operation
import com.apollographql.apollo.api.Query
import com.apollographql.apollo.api.Response
import com.apollographql.apollo.api.Subscription
import com.apollographql.apollo.cache.normalized.ApolloStoreOperation
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import reactor.core.publisher.Mono

@JvmSynthetic
inline fun ApolloPrefetch.reactor(): Mono<Void> =
ReactorApollo.from(this)

@JvmSynthetic
inline fun <T> ApolloStoreOperation<T>.reactor(): Mono<T> =
ReactorApollo.from(this)

@JvmSynthetic
inline fun <T> ApolloQueryWatcher<T>.reactor(): Mono<Response<T>> =
ReactorApollo.from(this)

@JvmSynthetic
inline fun <T> ApolloCall<T>.reactor(): Mono<Response<T>> =
ReactorApollo.from(this)

@JvmSynthetic
inline fun <T> ApolloSubscriptionCall<T>.reactor(
backpressureStrategy: FluxSink.OverflowStrategy = FluxSink.OverflowStrategy.LATEST
): Flux<Response<T>> = ReactorApollo.from(this, backpressureStrategy)

/**
* Creates a new [ApolloQueryCall] call and then converts it to an [Mono].
*
* The number of emissions this Mono will have is based on the
* [com.apollographql.apollo.fetcher.ResponseFetcher] used with the call.
*/
@JvmSynthetic
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorQuery(
query: Query<D, T, V>,
configure: ApolloQueryCall<T>.() -> ApolloQueryCall<T> = { this }
): Mono<Response<T>> = query(query).configure().reactor()

/**
* Creates a new [ApolloMutationCall] call and then converts it to a [Mono].
*/
@JvmSynthetic
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorMutate(
mutation: Mutation<D, T, V>,
configure: ApolloMutationCall<T>.() -> ApolloMutationCall<T> = { this }
): Mono<Response<T>> = mutate(mutation).configure().reactor()

/**
* Creates a new [ApolloMutationCall] call and then converts it to a [Mono].
*
* Provided optimistic updates will be stored in [com.apollographql.apollo.cache.normalized.ApolloStore]
* immediately before mutation execution. Any [ApolloQueryWatcher] dependent on the changed cache records will
* be re-fetched.
*/
@JvmSynthetic
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorMutate(
mutation: Mutation<D, T, V>,
withOptimisticUpdates: D,
configure: ApolloMutationCall<T>.() -> ApolloMutationCall<T> = { this }
): Mono<Response<T>> = mutate(mutation, withOptimisticUpdates).configure().reactor()

/**
* Creates the [ApolloPrefetch] by wrapping the operation object inside and then converts it to a [Mono].
*/
@JvmSynthetic
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorPrefetch(
operation: Operation<D, T, V>
): Mono<Void> = prefetch(operation).reactor()

/**
* Creates a new [ApolloSubscriptionCall] call and then converts it to a [Flux].
*
* Back-pressure strategy can be provided via [backpressureStrategy] parameter. The default value is [FluxSink.OverflowStrategy.LATEST]
*/
@JvmSynthetic
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorSubscribe(
subscription: Subscription<D, T, V>,
backpressureStrategy: FluxSink.OverflowStrategy = FluxSink.OverflowStrategy.LATEST
): Flux<Response<T>> = subscribe(subscription).reactor(backpressureStrategy)
1 change: 1 addition & 0 deletions buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repositories {
gradlePluginPortal()
google()
mavenCentral()
mavenLocal()
}

dependencies {
Expand Down
4 changes: 4 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def versions = [
okHttp4 : '4.9.0',
okHttp : '3.12.11', // Keep this to keep supporting older Android devices
okio : '2.9.0',
reactor : '3.4.5',
rxjava : '2.2.20',
rxjava3 : '3.0.7',
rxandroid : '2.0.1',
Expand Down Expand Up @@ -86,6 +87,9 @@ ext.dep = [
java : "com.squareup:javapoet:$versions.javaPoet",
kotlin: "com.squareup:kotlinpoet:$versions.kotlinPoet",
],
reactor : [
core : "io.projectreactor:reactor-core:$versions.reactor",
],
rx : [
java : "io.reactivex.rxjava2:rxjava:$versions.rxjava",
java3 : "io.reactivex.rxjava3:rxjava:$versions.rxjava3",
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ include("apollo-runtime")
include("apollo-api")
include("apollo-rx2-support")
include("apollo-rx3-support")
include("apollo-reactor-support")
include("apollo-coroutines-support")
include("apollo-http-cache")
include("apollo-http-cache-api")
Expand Down

0 comments on commit 9d4f795

Please sign in to comment.