Skip to content
This repository has been archived by the owner on Jun 27, 2018. It is now read-only.

Commit

Permalink
Merge pull request #2 from akarnokd/FixesFromRxJava2Since0421
Browse files Browse the repository at this point in the history
Fixes from RxJava since April 21
  • Loading branch information
akarnokd authored Jun 30, 2017
2 parents 9f413ef + 85557ce commit 3d04b80
Show file tree
Hide file tree
Showing 186 changed files with 2,910 additions and 752 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ before_install:
- chmod +x gradlew

script:
- export GRADLE_OPTS=-Xmx1024m
- ./gradlew assemble --stacktrace
- ./gradlew check jacocoFullReport --stacktrace

Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ Preview for version 3 of RxJava, the modern ReactiveX style library for composin

```groovy
// shared components
compile "com.github.akarnokd:rxjava3-common:0.1.0"
compile "com.github.akarnokd:rxjava3-common:0.2.0"
// Flowable only
compile "com.github.akarnokd:rxjava3-flowable:0.1.0"
compile "com.github.akarnokd:rxjava3-flowable:0.2.0"
// Observable, Single, Maybe, Completable
compile "com.github.akarnokd:rxjava3-observable:0.1.0"
compile "com.github.akarnokd:rxjava3-observable:0.2.0"
// Interoperation between Flowable and the rest
compile "com.github.akarnokd:rxjava3-interop:0.1.0"
compile "com.github.akarnokd:rxjava3-interop:0.2.0"
```

## Structure
Expand All @@ -45,4 +45,11 @@ This is an unofficial preparation place for RxJava 3 where the major change is t
- dependencies: **rxjava3-commons**
- `rxjava3-interop`
- transformers and converters between the backpressured `Flowable` and the non-backpressured `Observable` types
- dependencies: **rxjava3-flowable**, **rxjava3-observable**, (-> **rxjava3-commons**, **reactive-streams-extensions**, **reactive-streams**)
- dependencies: **rxjava3-flowable**, **rxjava3-observable**, (-> **rxjava3-commons**, **reactive-streams-extensions**, **reactive-streams**)


## TODOs

### Work out how the snapshot release and final release works in RxJava 1/2's Nebula plugin

Currently, this preview releases manually and not in response to merging or hitting the GitHub release button. I don't really know which and how the unsupported Nebula plugin works and if it supports Gradle subprojects. Also due to the encrypted credentials, such auto-release must happen from within ReactiveX/RxJava.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ subprojects {
check.dependsOn testng

task GCandMem(dependsOn: 'check') << {
print("Memory usage before GC: ")
println(java.lang.management.ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024.0 / 1024.0)
System.gc()
Thread.sleep(200)
print("Memory usage: ")
Expand Down
37 changes: 18 additions & 19 deletions common/src/main/java/io/reactivex/common/RxJavaCommonPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package io.reactivex.common;

import io.reactivex.common.annotations.Experimental;
import io.reactivex.common.annotations.NonNull;
import io.reactivex.common.annotations.Nullable;
import io.reactivex.common.exceptions.*;
Expand Down Expand Up @@ -98,10 +97,10 @@ public static boolean isLockdown() {
* Enables or disables the blockingX operators to fail
* with an IllegalStateException on a non-blocking
* scheduler such as computation or single.
* <p>History: 2.0.5 - experimental
* @param enable enable or disable the feature
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static void setFailOnNonBlockingScheduler(boolean enable) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
Expand All @@ -113,10 +112,10 @@ public static void setFailOnNonBlockingScheduler(boolean enable) {
* Returns true if the blockingX operators fail
* with an IllegalStateException on a non-blocking scheduler
* such as computation or single.
* <p>History: 2.0.5 - experimental
* @return true if the blockingX operators fail on a non-blocking scheduler
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static boolean isFailOnNonBlockingScheduler() {
return failNonBlockingScheduler;
}
Expand Down Expand Up @@ -566,11 +565,11 @@ public static void setSingleSchedulerHandler(@Nullable Function<? super Schedule
* such as awaiting a condition or signal
* and should return true to indicate the operator
* should not block but throw an IllegalArgumentException.
* <p>History: 2.0. - experimental
* @return true if the blocking should be prevented
* @see #setFailOnNonBlockingScheduler(boolean)
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static boolean onBeforeBlocking() {
BooleanSupplier f = onBeforeBlocking;
if (f != null) {
Expand All @@ -587,12 +586,12 @@ public static boolean onBeforeBlocking() {
* Set the handler that is called when an operator attempts a blocking
* await; the handler should return true to prevent the blocking
* and to signal an IllegalStateException instead.
* <p>History: 2.0.5 - experimental
* @param handler the handler to set, null resets to the default handler
* that always returns false
* @see #onBeforeBlocking()
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
Expand All @@ -603,10 +602,10 @@ public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) {
/**
* Returns the current blocking handler or null if no custom handler
* is set.
* <p>History: 2.0.5 - experimental
* @return the current blocking handler or null if not specified
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@Nullable
public static BooleanSupplier getOnBeforeBlocking() {
return onBeforeBlocking;
Expand All @@ -615,12 +614,12 @@ public static BooleanSupplier getOnBeforeBlocking() {
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createComputationScheduler(@NonNull ThreadFactory threadFactory) {
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand All @@ -629,12 +628,12 @@ public static Scheduler createComputationScheduler(@NonNull ThreadFactory thread
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory) {
return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand All @@ -643,12 +642,12 @@ public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory)
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFactory) {
return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand All @@ -657,12 +656,12 @@ public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFa
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}
* except using {@code threadFactory} for thread creation.
* <p>History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
* @since 2.0.5 - experimental
* @since 2.1
*/
@Experimental
@NonNull
public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFactory) {
return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/io/reactivex/common/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public static Scheduler single() {
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
* <code><pre>
* <pre><code>
* ExecutorService exec = Executors.newSingleThreadedExecutor();
* try {
* Scheduler scheduler = Schedulers.from(exec);
Expand All @@ -324,7 +324,7 @@ public static Scheduler single() {
* } finally {
* exec.shutdown();
* }
* </pre></code>
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link io.reactivex.common.Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
Expand Down
47 changes: 23 additions & 24 deletions common/src/main/java/io/reactivex/common/TestConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.*;
import java.util.concurrent.*;

import io.reactivex.common.annotations.Experimental;
import io.reactivex.common.exceptions.CompositeException;
import io.reactivex.common.functions.Predicate;
import io.reactivex.common.internal.functions.*;
Expand Down Expand Up @@ -336,11 +335,11 @@ public final U assertValue(T value) {
* Assert that this TestObserver/TestObserver did not receive an onNext value which is equal to
* the given value with respect to Objects.equals.
*
* @since 2.0.5 - experimental
* <p>History: 2.0.5 - experimental
* @since 2.1
* @param value the value to expect not being received
* @return this;
*/
@Experimental
@SuppressWarnings("unchecked")
public final U assertNever(T value) {
int s = values.size();
Expand Down Expand Up @@ -377,12 +376,12 @@ public final U assertValue(Predicate<T> valuePredicate) {
* Asserts that this TestObserver/TestObserver did not receive any onNext value for which
* the provided predicate returns true.
*
* @since 2.0.5 - experimental
* <p>History: 2.0.5 - experimental
* @since 2.1
* @param valuePredicate the predicate that receives the onNext value
* and should return true for the expected value.
* @return this
*/
@Experimental
@SuppressWarnings("unchecked")
public final U assertNever(Predicate<? super T> valuePredicate) {
int s = values.size();
Expand Down Expand Up @@ -548,7 +547,7 @@ public final U assertValueSequence(Iterable<? extends T> sequence) {
throw fail("More values received than expected (" + i + ")");
}
if (expectedNext) {
throw fail("Fever values received than expected (" + i + ")");
throw fail("Fewer values received than expected (" + i + ")");
}
return (U)this;
}
Expand Down Expand Up @@ -780,12 +779,12 @@ public final U assertEmpty() {
/**
* Set the tag displayed along with an assertion failure's
* other state information.
* <p>History: 2.0.7 - experimental
* @param tag the string to display (null won't print any tag)
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U withTag(CharSequence tag) {
this.tag = tag;
return (U)this;
Expand All @@ -794,9 +793,9 @@ public final U withTag(CharSequence tag) {
/**
* Enumeration of default wait strategies when waiting for a specific number of
* items in {@link TestConsumer#awaitCount(int, Runnable)}.
* @since 2.0.7 - experimental
* <p>History: 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public enum TestWaitStrategy implements Runnable {
/** The wait loop will spin as fast as possible. */
SPIN {
Expand Down Expand Up @@ -859,12 +858,12 @@ static void sleep(int millis) {
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates by sleeping 10 milliseconds at a time
* up to 5000 milliseconds of timeout.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @return this
* @see #awaitCount(int, Runnable, long)
* @since 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public final U awaitCount(int atLeast) {
return awaitCount(atLeast, TestWaitStrategy.SLEEP_10MS, 5000);
}
Expand All @@ -873,23 +872,24 @@ public final U awaitCount(int atLeast) {
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates by waiting according to the wait
* strategy and up to 5000 milliseconds of timeout.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
* no terminal event either, see {@link TestWaitStrategy}
* for examples
* @return this
* @see #awaitCount(int, Runnable, long)
* @since 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public final U awaitCount(int atLeast, Runnable waitStrategy) {
return awaitCount(atLeast, waitStrategy, 5000);
}

/**
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
Expand All @@ -898,10 +898,9 @@ public final U awaitCount(int atLeast, Runnable waitStrategy) {
* @param timeoutMillis if positive, the await ends if the specified amount of
* time has passed no matter how many items were received
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis) {
long start = System.currentTimeMillis();
for (;;) {
Expand All @@ -922,37 +921,37 @@ public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis
}

/**
* <p>History: 2.0.7 - experimental
* @return true if one of the timeout-based await methods has timed out.
* @see #clearTimeout()
* @see #assertTimeout()
* @see #assertNoTimeout()
* @since 2.0.7 - experimental
* @since 2.1
*/
@Experimental
public final boolean isTimeout() {
return timeout;
}

/**
* Clears the timeout flag set by the await methods when they timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.0.7 - experimental
* @since 2.1
* @see #isTimeout()
*/
@SuppressWarnings("unchecked")
@Experimental
public final U clearTimeout() {
timeout = false;
return (U)this;
}

/**
* Asserts that some awaitX method has timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U assertTimeout() {
if (!timeout) {
throw fail("No timeout?!");
Expand All @@ -963,11 +962,11 @@ public final U assertTimeout() {

/**
* Asserts that some awaitX method has not timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.0.7 - experimental
* @since 2.1
*/
@SuppressWarnings("unchecked")
@Experimental
public final U assertNoTimeout() {
if (timeout) {
throw fail("Timeout?!");
Expand Down
Loading

0 comments on commit 3d04b80

Please sign in to comment.