The features of this library (and more) have been integrated into RxJava 3 proper starting with version 3.0.0-RC7.
RxJava 3 interop library for supporting Java 8 features such as Optional, Stream and CompletableFuture.
compile 'com.github.akarnokd:rxjava3-jdk8-interop:3.0.0-RC6'
compile 'com.github.akarnokd:rxjava2-jdk8-interop:0.3.7'
Javadocs: https://akarnokd.github.com/RxJavaJdk8Interop/javadoc/index.html
The main entry points are:
FlowableInterop
ObservableInterop
SingleInterop
MaybeInterop
CompletableInterop
Note that java.util.stream.Stream
can be consumed at most once and only
synchronously.
Stream<T> stream = ...
Flowable<T> flow = FlowableInterop.fromStream(stream);
Observable<T> obs = ObservableInterop.fromStream(stream);
Optional<T> opt = ...
Flowable<T> flow = FlowableInterop.fromOptional(opt);
Observable<T> obs = ObservableInterop.fromOptional(opt);
Note that cancelling the Subscription won't cancel the CompletionStage
.
CompletionStage<T> cs = ...
Flowable<T> flow = FlowableInterop.fromFuture(cs);
Observable<T> flow = ObservableInterop.fromFuture(cs);
Flowable.range(1, 10)
.compose(FlowableInterop.collect(Collectors.toList()))
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
CompletionStage<Integer> cs = Flowable.just(1)
.delay(1, TimeUnit.SECONDS)
// return first
.to(FlowableInterop.first());
// return single
// .to(FlowableInterop.single());
// return last
// .to(FlowableInterop.last());
cs.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
CompletionStage<Integer> cs = Single.just(1)
.delay(1, TimeUnit.SECONDS)
.to(SingleInterop.get());
cs.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
CompletionStage<Integer> cs = Maybe.just(1)
.delay(1, TimeUnit.SECONDS)
.to(MaybeInterop.get());
cs.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
CompletionStage<Void> cs = Completable.complete()
.delay(1, TimeUnit.SECONDS)
.to(CompletableInterop.await());
cs.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
This is a blocking operation
Optional<Integer> opt = Flowable.just(1)
.to(FlowableInterop.firstElement());
System.out.println(opt.map(v -> v + 1).orElse(-1));
This is a blocking operation. Closing the stream will cancel the RxJava sequence.
Flowable.range(1, 10)
.to(FlowableInterop.toStream())
.parallel()
.map(v -> v + 1)
.forEach(System.out::println);
Note that since consuming a stream is practically blocking, there is no need
for a maxConcurrency
parameter.
Flowable.range(1, 5)
.compose(FlowableInterop.flatMapStream(v -> Arrays.asList(v, v + 1).stream()))
.test()
.assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
Flowable.range(1, 5)
.compose(FlowableInterop.mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty()))
.test()
.assertResult(2, 4);