forked from apollographql/apollo-kotlin
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
11 changed files
with
452 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
// Signature format: 3.0 | ||
package com.apollographql.apollo.reactor { | ||
|
||
public final class KotlinExtensions { | ||
} | ||
|
||
public class ReactorApollo { | ||
method public static <T> reactor.core.publisher.Mono<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloQueryWatcher<T!>); | ||
method public static <T> reactor.core.publisher.Mono<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloCall<T!>); | ||
method public static reactor.core.publisher.Mono<java.lang.Void> from(com.apollographql.apollo.ApolloPrefetch); | ||
method public static <T> reactor.core.publisher.Flux<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloSubscriptionCall<T!>); | ||
method public static <T> reactor.core.publisher.Flux<com.apollographql.apollo.api.Response<T!>!> from(com.apollographql.apollo.ApolloSubscriptionCall<T!>, reactor.core.publisher.FluxSink.OverflowStrategy); | ||
method public static <T> reactor.core.publisher.Mono<T!> from(com.apollographql.apollo.cache.normalized.ApolloStoreOperation<T!>); | ||
} | ||
|
||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
plugins { | ||
`java-library` | ||
kotlin("jvm") | ||
} | ||
|
||
dependencies { | ||
add("implementation", project(":apollo-api")) | ||
add("api", groovy.util.Eval.x(project, "x.dep.reactor.core")) | ||
add("api", project(":apollo-runtime")) | ||
} | ||
|
||
tasks.withType<Javadoc> { | ||
options.encoding = "UTF-8" | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
POM_ARTIFACT_ID=apollo-reactor-support | ||
POM_NAME=Apollo GraphQL Reactor Support | ||
POM_DESCRIPTION=Apollo GraphQL Reactor bindings | ||
POM_PACKAGING=jar |
211 changes: 211 additions & 0 deletions
211
apollo-reactor-support/src/main/java/com/apollographql/apollo/reactor/ReactorApollo.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
package com.apollographql.apollo.reactor; | ||
|
||
import com.apollographql.apollo.ApolloCall; | ||
import com.apollographql.apollo.ApolloPrefetch; | ||
import com.apollographql.apollo.ApolloQueryWatcher; | ||
import com.apollographql.apollo.ApolloSubscriptionCall; | ||
import com.apollographql.apollo.api.Response; | ||
import com.apollographql.apollo.cache.normalized.ApolloStoreOperation; | ||
import com.apollographql.apollo.exception.ApolloException; | ||
import com.apollographql.apollo.internal.subscription.ApolloSubscriptionTerminatedException; | ||
import com.apollographql.apollo.internal.util.Cancelable; | ||
import org.jetbrains.annotations.NotNull; | ||
import reactor.core.Disposable; | ||
import reactor.core.Exceptions; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.FluxSink; | ||
import reactor.core.publisher.Mono; | ||
import reactor.core.publisher.MonoSink; | ||
|
||
import static com.apollographql.apollo.api.internal.Utils.checkNotNull; | ||
|
||
/** | ||
* The ReactorApollo class provides methods for converting ApolloCall, ApolloPrefetch and ApolloWatcher types to Reactor sources. | ||
*/ | ||
public class ReactorApollo { | ||
|
||
private ReactorApollo() { | ||
throw new AssertionError("This class cannot be instantiated"); | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloQueryWatcher} to an asynchronous Mono. | ||
* | ||
* @param watcher the ApolloQueryWatcher to convert. | ||
* @param <T> the value type | ||
* @return the converted Mono | ||
* @throws NullPointerException if watcher == null | ||
*/ | ||
@NotNull | ||
public static <T> Mono<Response<T>> from(@NotNull final ApolloQueryWatcher<T> watcher) { | ||
checkNotNull(watcher, "watcher == null"); | ||
return Mono.create(sink -> { | ||
ApolloQueryWatcher<T> clone = watcher.clone(); | ||
cancelOnMonoDisposed(sink, clone); | ||
clone.enqueueAndWatch(new ApolloCall.Callback<T>() { | ||
@Override public void onResponse(@NotNull Response<T> response) { | ||
sink.success(response); | ||
} | ||
|
||
@Override public void onFailure(@NotNull ApolloException e) { | ||
Exceptions.throwIfFatal(e); | ||
sink.error(e); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloCall} to an {@link Mono}. The number ofemissions this Mono will have is based on the {@link | ||
* com.apollographql.apollo.fetcher.ResponseFetcher} used with the call. | ||
* | ||
* @param call the ApolloCall to convert | ||
* @param <T> the value type. | ||
* @return the converted Mono | ||
* @throws NullPointerException if originalCall == null | ||
*/ | ||
@NotNull | ||
public static <T> Mono<Response<T>> from(@NotNull final ApolloCall<T> call) { | ||
checkNotNull(call, "call == null"); | ||
return Mono.create(sink -> { | ||
ApolloCall<T> clone = call.toBuilder().build(); | ||
cancelOnMonoDisposed(sink, clone); | ||
clone.enqueue(new ApolloCall.Callback<T>() { | ||
@Override public void onResponse(@NotNull Response<T> response) { | ||
sink.success(response); | ||
} | ||
|
||
@Override public void onFailure(@NotNull ApolloException e) { | ||
Exceptions.throwIfFatal(e); | ||
sink.error(e); | ||
} | ||
|
||
@Override public void onStatusEvent(@NotNull ApolloCall.StatusEvent event) { | ||
if (event == ApolloCall.StatusEvent.COMPLETED) { | ||
sink.success(); | ||
} | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloPrefetch} to a synchronous Mono<Void> | ||
* | ||
* @param prefetch the ApolloPrefetch to convert | ||
* @return the converted Mono<Void> | ||
* @throws NullPointerException if prefetch == null | ||
*/ | ||
@NotNull | ||
public static Mono<Void> from(@NotNull final ApolloPrefetch prefetch) { | ||
checkNotNull(prefetch, "prefetch == null"); | ||
return Mono.create(sink -> { | ||
ApolloPrefetch clone = prefetch.clone(); | ||
cancelOnMonoDisposed(sink, clone); | ||
clone.enqueue(new ApolloPrefetch.Callback() { | ||
@Override public void onSuccess() { | ||
sink.success(); | ||
} | ||
|
||
@Override public void onFailure(@NotNull ApolloException e) { | ||
Exceptions.throwIfFatal(e); | ||
sink.error(e); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
@NotNull | ||
public static <T> Flux<Response<T>> from(@NotNull ApolloSubscriptionCall<T> call) { | ||
return from(call, FluxSink.OverflowStrategy.LATEST); | ||
} | ||
|
||
@NotNull | ||
public static <T> Flux<Response<T>> from(@NotNull final ApolloSubscriptionCall<T> call, | ||
@NotNull FluxSink.OverflowStrategy backpressureStrategy) { | ||
checkNotNull(call, "originalCall == null"); | ||
checkNotNull(backpressureStrategy, "backpressureStrategy == null"); | ||
return Flux.create(sink -> { | ||
ApolloSubscriptionCall<T> clone = call.clone(); | ||
cancelOnFluxDisposed(sink, clone); | ||
clone.execute( | ||
new ApolloSubscriptionCall.Callback<T>() { | ||
@Override public void onResponse(@NotNull Response<T> response) { | ||
if (!sink.isCancelled()) { | ||
sink.next(response); | ||
} | ||
} | ||
|
||
@Override public void onFailure(@NotNull ApolloException e) { | ||
Exceptions.throwIfFatal(e); | ||
if (!sink.isCancelled()) { | ||
sink.error(e); | ||
} | ||
} | ||
|
||
@Override public void onCompleted() { | ||
if (!sink.isCancelled()) { | ||
sink.complete(); | ||
} | ||
} | ||
|
||
@Override public void onTerminated() { | ||
onFailure(new ApolloSubscriptionTerminatedException("Subscription server unexpectedly terminated " | ||
+ "connection")); | ||
} | ||
|
||
@Override public void onConnected() { | ||
} | ||
} | ||
); | ||
}, backpressureStrategy); | ||
} | ||
|
||
/** | ||
* Converts an {@link ApolloStoreOperation} to a Mono. | ||
* | ||
* @param operation the ApolloStoreOperation to convert | ||
* @param <T> the value type | ||
* @return the converted Mono | ||
*/ | ||
@NotNull | ||
public static <T> Mono<T> from(@NotNull final ApolloStoreOperation<T> operation) { | ||
checkNotNull(operation, "operation == null"); | ||
return Mono.create(sink -> { | ||
operation.enqueue(new ApolloStoreOperation.Callback<T>() { | ||
@Override | ||
public void onSuccess(T result) { | ||
sink.success(result); | ||
} | ||
|
||
@Override | ||
public void onFailure(@NotNull Throwable t) { | ||
sink.error(t); | ||
} | ||
}); | ||
}); | ||
|
||
} | ||
|
||
private static <T> void cancelOnMonoDisposed(MonoSink<T> sink, final Cancelable cancelable) { | ||
sink.onCancel(getReactorDisposable(cancelable)); | ||
sink.onDispose(getReactorDisposable(cancelable)); | ||
} | ||
|
||
private static <T> void cancelOnFluxDisposed(FluxSink<T> sink, final Cancelable cancelable) { | ||
sink.onCancel(getReactorDisposable(cancelable)); | ||
sink.onDispose(getReactorDisposable(cancelable)); | ||
} | ||
|
||
private static Disposable getReactorDisposable(final Cancelable cancelable) { | ||
return new Disposable() { | ||
@Override public void dispose() { | ||
cancelable.cancel(); | ||
} | ||
|
||
@Override public boolean isDisposed() { | ||
return cancelable.isCanceled(); | ||
} | ||
}; | ||
} | ||
} |
96 changes: 96 additions & 0 deletions
96
apollo-reactor-support/src/main/java/com/apollographql/apollo/reactor/ReactorExtensions.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
@file:Suppress("NOTHING_TO_INLINE") | ||
@file:JvmName("KotlinExtensions") | ||
|
||
package com.apollographql.apollo.reactor | ||
|
||
import com.apollographql.apollo.ApolloCall | ||
import com.apollographql.apollo.ApolloClient | ||
import com.apollographql.apollo.ApolloMutationCall | ||
import com.apollographql.apollo.ApolloPrefetch | ||
import com.apollographql.apollo.ApolloQueryCall | ||
import com.apollographql.apollo.ApolloQueryWatcher | ||
import com.apollographql.apollo.ApolloSubscriptionCall | ||
import com.apollographql.apollo.api.Mutation | ||
import com.apollographql.apollo.api.Operation | ||
import com.apollographql.apollo.api.Query | ||
import com.apollographql.apollo.api.Response | ||
import com.apollographql.apollo.api.Subscription | ||
import com.apollographql.apollo.cache.normalized.ApolloStoreOperation | ||
import reactor.core.publisher.Flux | ||
import reactor.core.publisher.FluxSink | ||
import reactor.core.publisher.Mono | ||
|
||
@JvmSynthetic | ||
inline fun ApolloPrefetch.reactor(): Mono<Void> = | ||
ReactorApollo.from(this) | ||
|
||
@JvmSynthetic | ||
inline fun <T> ApolloStoreOperation<T>.reactor(): Mono<T> = | ||
ReactorApollo.from(this) | ||
|
||
@JvmSynthetic | ||
inline fun <T> ApolloQueryWatcher<T>.reactor(): Mono<Response<T>> = | ||
ReactorApollo.from(this) | ||
|
||
@JvmSynthetic | ||
inline fun <T> ApolloCall<T>.reactor(): Mono<Response<T>> = | ||
ReactorApollo.from(this) | ||
|
||
@JvmSynthetic | ||
inline fun <T> ApolloSubscriptionCall<T>.reactor( | ||
backpressureStrategy: FluxSink.OverflowStrategy = FluxSink.OverflowStrategy.LATEST | ||
): Flux<Response<T>> = ReactorApollo.from(this, backpressureStrategy) | ||
|
||
/** | ||
* Creates a new [ApolloQueryCall] call and then converts it to an [Mono]. | ||
* | ||
* The number of emissions this Mono will have is based on the | ||
* [com.apollographql.apollo.fetcher.ResponseFetcher] used with the call. | ||
*/ | ||
@JvmSynthetic | ||
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorQuery( | ||
query: Query<D, T, V>, | ||
configure: ApolloQueryCall<T>.() -> ApolloQueryCall<T> = { this } | ||
): Mono<Response<T>> = query(query).configure().reactor() | ||
|
||
/** | ||
* Creates a new [ApolloMutationCall] call and then converts it to a [Mono]. | ||
*/ | ||
@JvmSynthetic | ||
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorMutate( | ||
mutation: Mutation<D, T, V>, | ||
configure: ApolloMutationCall<T>.() -> ApolloMutationCall<T> = { this } | ||
): Mono<Response<T>> = mutate(mutation).configure().reactor() | ||
|
||
/** | ||
* Creates a new [ApolloMutationCall] call and then converts it to a [Mono]. | ||
* | ||
* Provided optimistic updates will be stored in [com.apollographql.apollo.cache.normalized.ApolloStore] | ||
* immediately before mutation execution. Any [ApolloQueryWatcher] dependent on the changed cache records will | ||
* be re-fetched. | ||
*/ | ||
@JvmSynthetic | ||
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorMutate( | ||
mutation: Mutation<D, T, V>, | ||
withOptimisticUpdates: D, | ||
configure: ApolloMutationCall<T>.() -> ApolloMutationCall<T> = { this } | ||
): Mono<Response<T>> = mutate(mutation, withOptimisticUpdates).configure().reactor() | ||
|
||
/** | ||
* Creates the [ApolloPrefetch] by wrapping the operation object inside and then converts it to a [Mono]. | ||
*/ | ||
@JvmSynthetic | ||
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorPrefetch( | ||
operation: Operation<D, T, V> | ||
): Mono<Void> = prefetch(operation).reactor() | ||
|
||
/** | ||
* Creates a new [ApolloSubscriptionCall] call and then converts it to a [Flux]. | ||
* | ||
* Back-pressure strategy can be provided via [backpressureStrategy] parameter. The default value is [FluxSink.OverflowStrategy.LATEST] | ||
*/ | ||
@JvmSynthetic | ||
inline fun <D : Operation.Data, T, V : Operation.Variables> ApolloClient.reactorSubscribe( | ||
subscription: Subscription<D, T, V>, | ||
backpressureStrategy: FluxSink.OverflowStrategy = FluxSink.OverflowStrategy.LATEST | ||
): Flux<Response<T>> = subscribe(subscription).reactor(backpressureStrategy) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ repositories { | |
gradlePluginPortal() | ||
google() | ||
mavenCentral() | ||
mavenLocal() | ||
} | ||
|
||
dependencies { | ||
|
Oops, something went wrong.