diff --git a/.travis.yml b/.travis.yml
index 762c980..2a1fc00 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -10,6 +10,7 @@ before_install:
- chmod +x gradlew
script:
+ - export GRADLE_OPTS=-Xmx1024m
- ./gradlew assemble --stacktrace
- ./gradlew check jacocoFullReport --stacktrace
diff --git a/README.md b/README.md
index 5e5df0b..f8c4c23 100644
--- a/README.md
+++ b/README.md
@@ -10,16 +10,16 @@ Preview for version 3 of RxJava, the modern ReactiveX style library for composin
```groovy
// shared components
-compile "com.github.akarnokd:rxjava3-common:0.1.0"
+compile "com.github.akarnokd:rxjava3-common:0.2.0"
// Flowable only
-compile "com.github.akarnokd:rxjava3-flowable:0.1.0"
+compile "com.github.akarnokd:rxjava3-flowable:0.2.0"
// Observable, Single, Maybe, Completable
-compile "com.github.akarnokd:rxjava3-observable:0.1.0"
+compile "com.github.akarnokd:rxjava3-observable:0.2.0"
// Interoperation between Flowable and the rest
-compile "com.github.akarnokd:rxjava3-interop:0.1.0"
+compile "com.github.akarnokd:rxjava3-interop:0.2.0"
```
## Structure
@@ -45,4 +45,11 @@ This is an unofficial preparation place for RxJava 3 where the major change is t
- dependencies: **rxjava3-commons**
- `rxjava3-interop`
- transformers and converters between the backpressured `Flowable` and the non-backpressured `Observable` types
- - dependencies: **rxjava3-flowable**, **rxjava3-observable**, (-> **rxjava3-commons**, **reactive-streams-extensions**, **reactive-streams**)
+ - dependencies: **rxjava3-flowable**, **rxjava3-observable**, (-> **rxjava3-commons**, **reactive-streams-extensions**, **reactive-streams**)
+
+
+## TODOs
+
+### Work out how the snapshot release and final release works in RxJava 1/2's Nebula plugin
+
+Currently, this preview releases manually and not in response to merging or hitting the GitHub release button. I don't really know which and how the unsupported Nebula plugin works and if it supports Gradle subprojects. Also due to the encrypted credentials, such auto-release must happen from within ReactiveX/RxJava.
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 8b32ca6..afcd230 100644
--- a/build.gradle
+++ b/build.gradle
@@ -205,6 +205,8 @@ subprojects {
check.dependsOn testng
task GCandMem(dependsOn: 'check') << {
+ print("Memory usage before GC: ")
+ println(java.lang.management.ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed() / 1024.0 / 1024.0)
System.gc()
Thread.sleep(200)
print("Memory usage: ")
diff --git a/common/src/main/java/io/reactivex/common/RxJavaCommonPlugins.java b/common/src/main/java/io/reactivex/common/RxJavaCommonPlugins.java
index bc98e07..35160bb 100644
--- a/common/src/main/java/io/reactivex/common/RxJavaCommonPlugins.java
+++ b/common/src/main/java/io/reactivex/common/RxJavaCommonPlugins.java
@@ -12,7 +12,6 @@
*/
package io.reactivex.common;
-import io.reactivex.common.annotations.Experimental;
import io.reactivex.common.annotations.NonNull;
import io.reactivex.common.annotations.Nullable;
import io.reactivex.common.exceptions.*;
@@ -98,10 +97,10 @@ public static boolean isLockdown() {
* Enables or disables the blockingX operators to fail
* with an IllegalStateException on a non-blocking
* scheduler such as computation or single.
+ *
History: 2.0.5 - experimental
* @param enable enable or disable the feature
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
public static void setFailOnNonBlockingScheduler(boolean enable) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
@@ -113,10 +112,10 @@ public static void setFailOnNonBlockingScheduler(boolean enable) {
* Returns true if the blockingX operators fail
* with an IllegalStateException on a non-blocking scheduler
* such as computation or single.
+ *
History: 2.0.5 - experimental
* @return true if the blockingX operators fail on a non-blocking scheduler
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
public static boolean isFailOnNonBlockingScheduler() {
return failNonBlockingScheduler;
}
@@ -566,11 +565,11 @@ public static void setSingleSchedulerHandler(@Nullable Function super Schedule
* such as awaiting a condition or signal
* and should return true to indicate the operator
* should not block but throw an IllegalArgumentException.
+ *
History: 2.0. - experimental
* @return true if the blocking should be prevented
* @see #setFailOnNonBlockingScheduler(boolean)
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
public static boolean onBeforeBlocking() {
BooleanSupplier f = onBeforeBlocking;
if (f != null) {
@@ -587,12 +586,12 @@ public static boolean onBeforeBlocking() {
* Set the handler that is called when an operator attempts a blocking
* await; the handler should return true to prevent the blocking
* and to signal an IllegalStateException instead.
+ *
History: 2.0.5 - experimental
* @param handler the handler to set, null resets to the default handler
* that always returns false
* @see #onBeforeBlocking()
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
@@ -603,10 +602,10 @@ public static void setOnBeforeBlocking(@Nullable BooleanSupplier handler) {
/**
* Returns the current blocking handler or null if no custom handler
* is set.
+ *
History: 2.0.5 - experimental
* @return the current blocking handler or null if not specified
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
@Nullable
public static BooleanSupplier getOnBeforeBlocking() {
return onBeforeBlocking;
@@ -615,12 +614,12 @@ public static BooleanSupplier getOnBeforeBlocking() {
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#computation()}
* except using {@code threadFactory} for thread creation.
+ *
History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
@NonNull
public static Scheduler createComputationScheduler(@NonNull ThreadFactory threadFactory) {
return new ComputationScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
@@ -629,12 +628,12 @@ public static Scheduler createComputationScheduler(@NonNull ThreadFactory thread
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#io()}
* except using {@code threadFactory} for thread creation.
+ *
History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
@NonNull
public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory) {
return new IoScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
@@ -643,12 +642,12 @@ public static Scheduler createIoScheduler(@NonNull ThreadFactory threadFactory)
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#newThread()}
* except using {@code threadFactory} for thread creation.
+ *
History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
@NonNull
public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFactory) {
return new NewThreadScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
@@ -657,12 +656,12 @@ public static Scheduler createNewThreadScheduler(@NonNull ThreadFactory threadFa
/**
* Create an instance of the default {@link Scheduler} used for {@link Schedulers#single()}
* except using {@code threadFactory} for thread creation.
+ *
History: 2.0.5 - experimental
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
* @return the created Scheduler instance
- * @since 2.0.5 - experimental
+ * @since 2.1
*/
- @Experimental
@NonNull
public static Scheduler createSingleScheduler(@NonNull ThreadFactory threadFactory) {
return new SingleScheduler(ObjectHelper.requireNonNull(threadFactory, "threadFactory is null"));
diff --git a/common/src/main/java/io/reactivex/common/Schedulers.java b/common/src/main/java/io/reactivex/common/Schedulers.java
index 1de06d4..3c5a3d4 100644
--- a/common/src/main/java/io/reactivex/common/Schedulers.java
+++ b/common/src/main/java/io/reactivex/common/Schedulers.java
@@ -312,7 +312,7 @@ public static Scheduler single() {
*
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
- *
+ *
* ExecutorService exec = Executors.newSingleThreadedExecutor();
* try {
* Scheduler scheduler = Schedulers.from(exec);
@@ -324,7 +324,7 @@ public static Scheduler single() {
* } finally {
* exec.shutdown();
* }
- *
+ *
*
* This type of scheduler is less sensitive to leaking {@link io.reactivex.common.Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
diff --git a/common/src/main/java/io/reactivex/common/TestConsumer.java b/common/src/main/java/io/reactivex/common/TestConsumer.java
index 9d95462..85260ce 100644
--- a/common/src/main/java/io/reactivex/common/TestConsumer.java
+++ b/common/src/main/java/io/reactivex/common/TestConsumer.java
@@ -16,7 +16,6 @@
import java.util.*;
import java.util.concurrent.*;
-import io.reactivex.common.annotations.Experimental;
import io.reactivex.common.exceptions.CompositeException;
import io.reactivex.common.functions.Predicate;
import io.reactivex.common.internal.functions.*;
@@ -336,11 +335,11 @@ public final U assertValue(T value) {
* Assert that this TestObserver/TestObserver did not receive an onNext value which is equal to
* the given value with respect to Objects.equals.
*
- * @since 2.0.5 - experimental
+ *
History: 2.0.5 - experimental
+ * @since 2.1
* @param value the value to expect not being received
* @return this;
*/
- @Experimental
@SuppressWarnings("unchecked")
public final U assertNever(T value) {
int s = values.size();
@@ -377,12 +376,12 @@ public final U assertValue(Predicate valuePredicate) {
* Asserts that this TestObserver/TestObserver did not receive any onNext value for which
* the provided predicate returns true.
*
- * @since 2.0.5 - experimental
+ * History: 2.0.5 - experimental
+ * @since 2.1
* @param valuePredicate the predicate that receives the onNext value
* and should return true for the expected value.
* @return this
*/
- @Experimental
@SuppressWarnings("unchecked")
public final U assertNever(Predicate super T> valuePredicate) {
int s = values.size();
@@ -548,7 +547,7 @@ public final U assertValueSequence(Iterable extends T> sequence) {
throw fail("More values received than expected (" + i + ")");
}
if (expectedNext) {
- throw fail("Fever values received than expected (" + i + ")");
+ throw fail("Fewer values received than expected (" + i + ")");
}
return (U)this;
}
@@ -780,12 +779,12 @@ public final U assertEmpty() {
/**
* Set the tag displayed along with an assertion failure's
* other state information.
+ *
History: 2.0.7 - experimental
* @param tag the string to display (null won't print any tag)
* @return this
- * @since 2.0.7 - experimental
+ * @since 2.1
*/
@SuppressWarnings("unchecked")
- @Experimental
public final U withTag(CharSequence tag) {
this.tag = tag;
return (U)this;
@@ -794,9 +793,9 @@ public final U withTag(CharSequence tag) {
/**
* Enumeration of default wait strategies when waiting for a specific number of
* items in {@link TestConsumer#awaitCount(int, Runnable)}.
- * @since 2.0.7 - experimental
+ *
History: 2.0.7 - experimental
+ * @since 2.1
*/
- @Experimental
public enum TestWaitStrategy implements Runnable {
/** The wait loop will spin as fast as possible. */
SPIN {
@@ -859,12 +858,12 @@ static void sleep(int millis) {
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates by sleeping 10 milliseconds at a time
* up to 5000 milliseconds of timeout.
+ *
History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @return this
* @see #awaitCount(int, Runnable, long)
- * @since 2.0.7 - experimental
+ * @since 2.1
*/
- @Experimental
public final U awaitCount(int atLeast) {
return awaitCount(atLeast, TestWaitStrategy.SLEEP_10MS, 5000);
}
@@ -873,6 +872,7 @@ public final U awaitCount(int atLeast) {
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates by waiting according to the wait
* strategy and up to 5000 milliseconds of timeout.
+ *
History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
@@ -880,9 +880,8 @@ public final U awaitCount(int atLeast) {
* for examples
* @return this
* @see #awaitCount(int, Runnable, long)
- * @since 2.0.7 - experimental
+ * @since 2.1
*/
- @Experimental
public final U awaitCount(int atLeast, Runnable waitStrategy) {
return awaitCount(atLeast, waitStrategy, 5000);
}
@@ -890,6 +889,7 @@ public final U awaitCount(int atLeast, Runnable waitStrategy) {
/**
* Await until the TestObserver/TestObserver receives the given
* number of items or terminates.
+ *
History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
@@ -898,10 +898,9 @@ public final U awaitCount(int atLeast, Runnable waitStrategy) {
* @param timeoutMillis if positive, the await ends if the specified amount of
* time has passed no matter how many items were received
* @return this
- * @since 2.0.7 - experimental
+ * @since 2.1
*/
@SuppressWarnings("unchecked")
- @Experimental
public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis) {
long start = System.currentTimeMillis();
for (;;) {
@@ -922,25 +921,25 @@ public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis
}
/**
+ *
History: 2.0.7 - experimental
* @return true if one of the timeout-based await methods has timed out.
* @see #clearTimeout()
* @see #assertTimeout()
* @see #assertNoTimeout()
- * @since 2.0.7 - experimental
+ * @since 2.1
*/
- @Experimental
public final boolean isTimeout() {
return timeout;
}
/**
* Clears the timeout flag set by the await methods when they timed out.
+ *
History: 2.0.7 - experimental
* @return this
- * @since 2.0.7 - experimental
+ * @since 2.1
* @see #isTimeout()
*/
@SuppressWarnings("unchecked")
- @Experimental
public final U clearTimeout() {
timeout = false;
return (U)this;
@@ -948,11 +947,11 @@ public final U clearTimeout() {
/**
* Asserts that some awaitX method has timed out.
+ *
History: 2.0.7 - experimental
* @return this
- * @since 2.0.7 - experimental
+ * @since 2.1
*/
@SuppressWarnings("unchecked")
- @Experimental
public final U assertTimeout() {
if (!timeout) {
throw fail("No timeout?!");
@@ -963,11 +962,11 @@ public final U assertTimeout() {
/**
* Asserts that some awaitX method has not timed out.
+ *
History: 2.0.7 - experimental
* @return this
- * @since 2.0.7 - experimental
+ * @since 2.1
*/
@SuppressWarnings("unchecked")
- @Experimental
public final U assertNoTimeout() {
if (timeout) {
throw fail("Timeout?!");
diff --git a/common/src/main/java/io/reactivex/common/annotations/CheckReturnValue.java b/common/src/main/java/io/reactivex/common/annotations/CheckReturnValue.java
index 916e117..8c05fdd 100644
--- a/common/src/main/java/io/reactivex/common/annotations/CheckReturnValue.java
+++ b/common/src/main/java/io/reactivex/common/annotations/CheckReturnValue.java
@@ -22,12 +22,12 @@
/**
* Marks methods whose return values should be checked.
*
- * @since 2.0.2 - experimental
+ *
History: 2.0.2 - experimental
+ * @since 2.1
*/
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target(ElementType.METHOD)
-@Experimental
public @interface CheckReturnValue {
}
diff --git a/common/src/main/java/io/reactivex/common/annotations/SchedulerSupport.java b/common/src/main/java/io/reactivex/common/annotations/SchedulerSupport.java
index 9ebba1c..38158fb 100644
--- a/common/src/main/java/io/reactivex/common/annotations/SchedulerSupport.java
+++ b/common/src/main/java/io/reactivex/common/annotations/SchedulerSupport.java
@@ -62,9 +62,9 @@
/**
* The operator/class runs on RxJava's {@linkplain Schedulers#single() single scheduler}
* or takes timing information from it.
- * @since 2.0.8 - experimental
+ *
History: 2.0.8 - experimental
+ * @since 2.1
*/
- @Experimental
String SINGLE = "io.reactivex:single";
/**
diff --git a/common/src/main/java/io/reactivex/common/exceptions/OnErrorNotImplementedException.java b/common/src/main/java/io/reactivex/common/exceptions/OnErrorNotImplementedException.java
index 341cd31..c5068eb 100644
--- a/common/src/main/java/io/reactivex/common/exceptions/OnErrorNotImplementedException.java
+++ b/common/src/main/java/io/reactivex/common/exceptions/OnErrorNotImplementedException.java
@@ -19,9 +19,9 @@
* Represents an exception used to signal to the {@code RxJavaCommonPlugins.onError()} that a
* callback-based subscribe() method on a base reactive type didn't specify
* an onError handler.
- * @since 2.0.6 - experimental
+ *
History: 2.0.7 - experimental
+ * @since 2.1
*/
-@Experimental
public final class OnErrorNotImplementedException extends RuntimeException {
private static final long serialVersionUID = -6298857009889503852L;
diff --git a/common/src/main/java/io/reactivex/common/exceptions/ProtocolViolationException.java b/common/src/main/java/io/reactivex/common/exceptions/ProtocolViolationException.java
index e71a0ae..7c17090 100644
--- a/common/src/main/java/io/reactivex/common/exceptions/ProtocolViolationException.java
+++ b/common/src/main/java/io/reactivex/common/exceptions/ProtocolViolationException.java
@@ -13,14 +13,12 @@
package io.reactivex.common.exceptions;
-import io.reactivex.common.annotations.Experimental;
-
/**
* Explicitly named exception to indicate a Reactive-Streams
* protocol violation.
- * @since 2.0.6 - experimental
+ *
History: 2.0.6 - experimental
+ * @since 2.1
*/
-@Experimental
public final class ProtocolViolationException extends IllegalStateException {
private static final long serialVersionUID = 1644750035281290266L;
diff --git a/common/src/main/java/io/reactivex/common/exceptions/UndeliverableException.java b/common/src/main/java/io/reactivex/common/exceptions/UndeliverableException.java
index 81c0f20..4b7a675 100644
--- a/common/src/main/java/io/reactivex/common/exceptions/UndeliverableException.java
+++ b/common/src/main/java/io/reactivex/common/exceptions/UndeliverableException.java
@@ -13,13 +13,11 @@
package io.reactivex.common.exceptions;
-import io.reactivex.common.annotations.Experimental;
-
/**
* Wrapper for Throwable errors that are sent to `RxJavaCommonPlugins.onError`.
- * @since 2.0.6 - experimental
+ *
History: 2.0.6 - experimental
+ * @since 2.1
*/
-@Experimental
public final class UndeliverableException extends IllegalStateException {
private static final long serialVersionUID = 1644750035281290266L;
diff --git a/common/src/main/java/io/reactivex/common/functions/Consumer.java b/common/src/main/java/io/reactivex/common/functions/Consumer.java
index 2ebef842..9e32af0 100644
--- a/common/src/main/java/io/reactivex/common/functions/Consumer.java
+++ b/common/src/main/java/io/reactivex/common/functions/Consumer.java
@@ -13,8 +13,6 @@
package io.reactivex.common.functions;
-import io.reactivex.common.annotations.NonNull;
-
/**
* A functional interface (callback) that accepts a single value.
* @param the value type
@@ -25,5 +23,5 @@ public interface Consumer {
* @param t the value
* @throws Exception on error
*/
- void accept(@NonNull T t) throws Exception;
+ void accept(T t) throws Exception;
}
diff --git a/common/src/main/java/io/reactivex/common/functions/Function.java b/common/src/main/java/io/reactivex/common/functions/Function.java
index df1c687..d20713d 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function.java
@@ -29,6 +29,5 @@ public interface Function {
* @return the output value
* @throws Exception on error
*/
- @NonNull
R apply(@NonNull T t) throws Exception;
}
diff --git a/common/src/main/java/io/reactivex/common/functions/Function3.java b/common/src/main/java/io/reactivex/common/functions/Function3.java
index 7283f76..0a0c649 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function3.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function3.java
@@ -19,7 +19,7 @@
* A functional interface (callback) that computes a value based on multiple input values.
* @param the first value type
* @param the second value type
- * @param the second value type
+ * @param the third value type
* @param the result type
*/
public interface Function3 {
diff --git a/common/src/main/java/io/reactivex/common/functions/Function4.java b/common/src/main/java/io/reactivex/common/functions/Function4.java
index 33428ad..9264850 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function4.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function4.java
@@ -19,8 +19,8 @@
* A functional interface (callback) that computes a value based on multiple input values.
* @param the first value type
* @param the second value type
- * @param the second value type
- * @param the second value type
+ * @param the third value type
+ * @param the fourth value type
* @param the result type
*/
public interface Function4 {
diff --git a/common/src/main/java/io/reactivex/common/functions/Function5.java b/common/src/main/java/io/reactivex/common/functions/Function5.java
index 1670336..93bb17c 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function5.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function5.java
@@ -19,9 +19,9 @@
* A functional interface (callback) that computes a value based on multiple input values.
* @param the first value type
* @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
+ * @param the third value type
+ * @param the fourth value type
+ * @param the fifth value type
* @param the result type
*/
public interface Function5 {
diff --git a/common/src/main/java/io/reactivex/common/functions/Function6.java b/common/src/main/java/io/reactivex/common/functions/Function6.java
index ecf18c7..dd4537d 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function6.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function6.java
@@ -19,10 +19,10 @@
* A functional interface (callback) that computes a value based on multiple input values.
* @param the first value type
* @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
+ * @param the third value type
+ * @param the fourth value type
+ * @param the fifth value type
+ * @param the sixth value type
* @param the result type
*/
public interface Function6 {
diff --git a/common/src/main/java/io/reactivex/common/functions/Function7.java b/common/src/main/java/io/reactivex/common/functions/Function7.java
index 84fa1da..d1dacb7 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function7.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function7.java
@@ -19,11 +19,11 @@
* A functional interface (callback) that computes a value based on multiple input values.
* @param the first value type
* @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
+ * @param the third value type
+ * @param the fourth value type
+ * @param the fifth value type
+ * @param the sixth value type
+ * @param the seventh value type
* @param the result type
*/
public interface Function7 {
diff --git a/common/src/main/java/io/reactivex/common/functions/Function8.java b/common/src/main/java/io/reactivex/common/functions/Function8.java
index aa56fc1..88c7ad5 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function8.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function8.java
@@ -19,12 +19,12 @@
* A functional interface (callback) that computes a value based on multiple input values.
* @param the first value type
* @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
+ * @param the third value type
+ * @param the fourth value type
+ * @param the fifth value type
+ * @param the sixth value type
+ * @param the seventh value type
+ * @param the eighth value type
* @param the result type
*/
public interface Function8 {
diff --git a/common/src/main/java/io/reactivex/common/functions/Function9.java b/common/src/main/java/io/reactivex/common/functions/Function9.java
index 9ea7fe3..58a93f0 100644
--- a/common/src/main/java/io/reactivex/common/functions/Function9.java
+++ b/common/src/main/java/io/reactivex/common/functions/Function9.java
@@ -19,13 +19,13 @@
* A functional interface (callback) that computes a value based on multiple input values.
* @param the first value type
* @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
- * @param the second value type
+ * @param the third value type
+ * @param the fourth value type
+ * @param the fifth value type
+ * @param the sixth value type
+ * @param the seventh value type
+ * @param the eighth value type
+ * @param the ninth value type
* @param the result type
*/
public interface Function9 {
diff --git a/common/src/main/java/io/reactivex/common/internal/queues/AbstractMpscLinkedQueue.java b/common/src/main/java/io/reactivex/common/internal/queues/AbstractMpscLinkedQueue.java
index 6928067..97f41db 100644
--- a/common/src/main/java/io/reactivex/common/internal/queues/AbstractMpscLinkedQueue.java
+++ b/common/src/main/java/io/reactivex/common/internal/queues/AbstractMpscLinkedQueue.java
@@ -99,7 +99,7 @@ final void spConsumerNode(LinkedQueueNode node) {
consumerNode.lazySet(node);
}
- final public boolean isEmpty() {
+ public final boolean isEmpty() {
return lvConsumerNode() == lvProducerNode();
}
diff --git a/common/src/main/java/io/reactivex/common/internal/schedulers/InstantPeriodicTask.java b/common/src/main/java/io/reactivex/common/internal/schedulers/InstantPeriodicTask.java
new file mode 100644
index 0000000..0e5efcf
--- /dev/null
+++ b/common/src/main/java/io/reactivex/common/internal/schedulers/InstantPeriodicTask.java
@@ -0,0 +1,103 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.common.internal.schedulers;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.common.*;
+import io.reactivex.common.internal.functions.Functions;
+
+/**
+ * Wrapper for a regular task that gets immediately rescheduled when the task completed.
+ */
+final class InstantPeriodicTask implements Callable, Disposable {
+
+ final Runnable task;
+
+ final AtomicReference> rest;
+
+ final AtomicReference> first;
+
+ final ExecutorService executor;
+
+ Thread runner;
+
+ static final FutureTask CANCELLED = new FutureTask(Functions.EMPTY_RUNNABLE, null);
+
+ InstantPeriodicTask(Runnable task, ExecutorService executor) {
+ super();
+ this.task = task;
+ this.first = new AtomicReference>();
+ this.rest = new AtomicReference>();
+ this.executor = executor;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ runner = Thread.currentThread();
+ try {
+ task.run();
+ setRest(executor.submit(this));
+ } catch (Throwable ex) {
+ RxJavaCommonPlugins.onError(ex);
+ }
+ } finally {
+ runner = null;
+ }
+ return null;
+ }
+
+ @Override
+ public void dispose() {
+ Future> current = first.getAndSet(CANCELLED);
+ if (current != null && current != CANCELLED) {
+ current.cancel(runner != Thread.currentThread());
+ }
+ current = rest.getAndSet(CANCELLED);
+ if (current != null && current != CANCELLED) {
+ current.cancel(runner != Thread.currentThread());
+ }
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return first.get() == CANCELLED;
+ }
+
+ void setFirst(Future> f) {
+ for (;;) {
+ Future> current = first.get();
+ if (current == CANCELLED) {
+ f.cancel(runner != Thread.currentThread());
+ }
+ if (first.compareAndSet(current, f)) {
+ return;
+ }
+ }
+ }
+
+ void setRest(Future> f) {
+ for (;;) {
+ Future> current = rest.get();
+ if (current == CANCELLED) {
+ f.cancel(runner != Thread.currentThread());
+ }
+ if (rest.compareAndSet(current, f)) {
+ return;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/common/src/main/java/io/reactivex/common/internal/schedulers/NewThreadWorker.java b/common/src/main/java/io/reactivex/common/internal/schedulers/NewThreadWorker.java
index acff281..556d0fe 100644
--- a/common/src/main/java/io/reactivex/common/internal/schedulers/NewThreadWorker.java
+++ b/common/src/main/java/io/reactivex/common/internal/schedulers/NewThreadWorker.java
@@ -82,8 +82,27 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un
* @param unit the time unit for both the initialDelay and period
* @return the ScheduledRunnable instance
*/
- public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
- ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaCommonPlugins.onSchedule(run));
+ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
+ final Runnable decoratedRun = RxJavaCommonPlugins.onSchedule(run);
+ if (period <= 0L) {
+
+ InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
+ try {
+ Future> f;
+ if (initialDelay <= 0L) {
+ f = executor.submit(periodicWrapper);
+ } else {
+ f = executor.schedule(periodicWrapper, initialDelay, unit);
+ }
+ periodicWrapper.setFirst(f);
+ } catch (RejectedExecutionException ex) {
+ RxJavaCommonPlugins.onError(ex);
+ return Scheduler.REJECTED;
+ }
+
+ return periodicWrapper;
+ }
+ ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
Future> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
diff --git a/common/src/main/java/io/reactivex/common/internal/schedulers/SchedulerPoolFactory.java b/common/src/main/java/io/reactivex/common/internal/schedulers/SchedulerPoolFactory.java
index 4d73569..b1ee5ed 100644
--- a/common/src/main/java/io/reactivex/common/internal/schedulers/SchedulerPoolFactory.java
+++ b/common/src/main/java/io/reactivex/common/internal/schedulers/SchedulerPoolFactory.java
@@ -57,6 +57,9 @@ private SchedulerPoolFactory() {
* Starts the purge thread if not already started.
*/
public static void start() {
+ if (!PURGE_ENABLED) {
+ return;
+ }
for (;;) {
ScheduledExecutorService curr = PURGE_THREAD.get();
if (curr != null && !curr.isShutdown()) {
@@ -78,7 +81,10 @@ public static void start() {
* Stops the purge thread.
*/
public static void shutdown() {
- PURGE_THREAD.get().shutdownNow();
+ ScheduledExecutorService exec = PURGE_THREAD.get();
+ if (exec != null) {
+ exec.shutdownNow();
+ }
POOLS.clear();
}
@@ -90,10 +96,10 @@ public static void shutdown() {
if (properties.containsKey(PURGE_ENABLED_KEY)) {
purgeEnable = Boolean.getBoolean(PURGE_ENABLED_KEY);
+ }
- if (purgeEnable && properties.containsKey(PURGE_PERIOD_SECONDS_KEY)) {
- purgePeriod = Integer.getInteger(PURGE_PERIOD_SECONDS_KEY, purgePeriod);
- }
+ if (purgeEnable && properties.containsKey(PURGE_PERIOD_SECONDS_KEY)) {
+ purgePeriod = Integer.getInteger(PURGE_PERIOD_SECONDS_KEY, purgePeriod);
}
PURGE_ENABLED = purgeEnable;
@@ -109,7 +115,7 @@ public static void shutdown() {
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
- if (exec instanceof ScheduledThreadPoolExecutor) {
+ if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
diff --git a/common/src/main/java/io/reactivex/common/internal/schedulers/SingleScheduler.java b/common/src/main/java/io/reactivex/common/internal/schedulers/SingleScheduler.java
index df7c8f7..2358c24 100644
--- a/common/src/main/java/io/reactivex/common/internal/schedulers/SingleScheduler.java
+++ b/common/src/main/java/io/reactivex/common/internal/schedulers/SingleScheduler.java
@@ -123,7 +123,28 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit uni
@NonNull
@Override
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
- ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaCommonPlugins.onSchedule(run));
+ final Runnable decoratedRun = RxJavaCommonPlugins.onSchedule(run);
+ if (period <= 0L) {
+
+ ScheduledExecutorService exec = executor.get();
+
+ InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, exec);
+ Future> f;
+ try {
+ if (initialDelay <= 0L) {
+ f = exec.submit(periodicWrapper);
+ } else {
+ f = exec.schedule(periodicWrapper, initialDelay, unit);
+ }
+ periodicWrapper.setFirst(f);
+ } catch (RejectedExecutionException ex) {
+ RxJavaCommonPlugins.onError(ex);
+ return REJECTED;
+ }
+
+ return periodicWrapper;
+ }
+ ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
Future> f = executor.get().scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
diff --git a/common/src/main/java/io/reactivex/common/internal/schedulers/TrampolineScheduler.java b/common/src/main/java/io/reactivex/common/internal/schedulers/TrampolineScheduler.java
index 73fb1db..df25240 100644
--- a/common/src/main/java/io/reactivex/common/internal/schedulers/TrampolineScheduler.java
+++ b/common/src/main/java/io/reactivex/common/internal/schedulers/TrampolineScheduler.java
@@ -107,6 +107,10 @@ Disposable enqueue(Runnable action, long execTime) {
int missed = 1;
for (;;) {
for (;;) {
+ if (disposed) {
+ queue.clear();
+ return DONE;
+ }
final TimedRunnable polled = queue.poll();
if (polled == null) {
break;
diff --git a/common/src/main/java/io/reactivex/common/internal/utils/ExceptionHelper.java b/common/src/main/java/io/reactivex/common/internal/utils/ExceptionHelper.java
index 32fefe9..979a078 100644
--- a/common/src/main/java/io/reactivex/common/internal/utils/ExceptionHelper.java
+++ b/common/src/main/java/io/reactivex/common/internal/utils/ExceptionHelper.java
@@ -106,6 +106,21 @@ public static List flatten(Throwable t) {
return list;
}
+ /**
+ * Workaround for Java 6 not supporting throwing a final Throwable from a catch block.
+ * @param the generic exception type
+ * @param e the Throwable error to return or throw
+ * @return the Throwable e if it is a subclass of Exception
+ * @throws E the generic exception thrown
+ */
+ @SuppressWarnings("unchecked")
+ public static Exception throwIfThrowable(Throwable e) throws E {
+ if (e instanceof Exception) {
+ return (Exception)e;
+ }
+ throw (E)e;
+ }
+
static final class Termination extends Throwable {
private static final long serialVersionUID = -4649703670690200604L;
diff --git a/common/src/main/java/io/reactivex/common/internal/utils/VolatileSizeArrayList.java b/common/src/main/java/io/reactivex/common/internal/utils/VolatileSizeArrayList.java
index d2130c4..9c978dc 100644
--- a/common/src/main/java/io/reactivex/common/internal/utils/VolatileSizeArrayList.java
+++ b/common/src/main/java/io/reactivex/common/internal/utils/VolatileSizeArrayList.java
@@ -22,7 +22,7 @@
* @param the element type
* @since 2.0.7
*/
-public final class VolatileSizeArrayList extends AtomicInteger implements List {
+public final class VolatileSizeArrayList extends AtomicInteger implements List, RandomAccess {
private static final long serialVersionUID = 3972397474470203923L;
diff --git a/common/src/test/java/io/reactivex/common/RxJavaCommonPluginsTest.java b/common/src/test/java/io/reactivex/common/RxJavaCommonPluginsTest.java
index fa55df8..de923e6 100644
--- a/common/src/test/java/io/reactivex/common/RxJavaCommonPluginsTest.java
+++ b/common/src/test/java/io/reactivex/common/RxJavaCommonPluginsTest.java
@@ -846,7 +846,6 @@ public void uncaughtException(Thread t, Throwable e) {
* @throws Exception on error
*/
@Test
- @SuppressWarnings("rawtypes")
public void onErrorWithSuper() throws Exception {
try {
Consumer super Throwable> errorHandler = new Consumer() {
@@ -893,7 +892,6 @@ public Runnable apply(Runnable runnable) throws Exception {
}
}
- @SuppressWarnings({"rawtypes", "unchecked" })
@Test
public void clearIsPassthrough() {
try {
diff --git a/common/src/test/java/io/reactivex/common/SchedulerTest.java b/common/src/test/java/io/reactivex/common/SchedulerTest.java
index a85e60d..4890c51 100644
--- a/common/src/test/java/io/reactivex/common/SchedulerTest.java
+++ b/common/src/test/java/io/reactivex/common/SchedulerTest.java
@@ -282,4 +282,26 @@ public void holders() {
assertNotNull(new Schedulers.SingleHolder());
}
+
+ static final class CustomScheduler extends Scheduler {
+
+ @Override
+ public Worker createWorker() {
+ return Schedulers.single().createWorker();
+ }
+
+ }
+
+ @Test
+ public void customScheduleDirectDisposed() {
+ CustomScheduler scheduler = new CustomScheduler();
+
+ Disposable d = scheduler.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MINUTES);
+
+ assertFalse(d.isDisposed());
+
+ d.dispose();
+
+ assertTrue(d.isDisposed());
+ }
}
diff --git a/common/src/test/java/io/reactivex/common/exceptions/ExceptionsTest.java b/common/src/test/java/io/reactivex/common/exceptions/ExceptionsTest.java
index 826ff8b..6f2ca85 100644
--- a/common/src/test/java/io/reactivex/common/exceptions/ExceptionsTest.java
+++ b/common/src/test/java/io/reactivex/common/exceptions/ExceptionsTest.java
@@ -15,7 +15,7 @@
*/
package io.reactivex.common.exceptions;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.IOException;
@@ -87,4 +87,27 @@ public void manualPropagate() {
}
}
+ @Test
+ public void errorNotImplementedNull1() {
+ OnErrorNotImplementedException ex = new OnErrorNotImplementedException(null);
+
+ assertTrue("" + ex.getCause(), ex.getCause() instanceof NullPointerException);
+ }
+
+ @Test
+ public void errorNotImplementedNull2() {
+ OnErrorNotImplementedException ex = new OnErrorNotImplementedException("Message", null);
+
+ assertTrue("" + ex.getCause(), ex.getCause() instanceof NullPointerException);
+ }
+
+ @Test
+ public void errorNotImplementedWithCause() {
+ OnErrorNotImplementedException ex = new OnErrorNotImplementedException("Message", new TestException("Forced failure"));
+
+ assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException);
+
+ assertEquals("" + ex.getCause(), "Forced failure", ex.getCause().getMessage());
+ }
+
}
diff --git a/common/src/test/java/io/reactivex/common/internal/functions/FunctionsTest.java b/common/src/test/java/io/reactivex/common/internal/functions/FunctionsTest.java
index 3b23322..36fbd96 100644
--- a/common/src/test/java/io/reactivex/common/internal/functions/FunctionsTest.java
+++ b/common/src/test/java/io/reactivex/common/internal/functions/FunctionsTest.java
@@ -16,10 +16,12 @@
import static org.junit.Assert.*;
import java.lang.reflect.Method;
+import java.util.List;
import org.junit.Test;
-import io.reactivex.common.TestCommonHelper;
+import io.reactivex.common.*;
+import io.reactivex.common.exceptions.TestException;
import io.reactivex.common.functions.*;
import io.reactivex.common.internal.functions.Functions.*;
import io.reactivex.common.internal.utils.ExceptionHelper;
@@ -245,4 +247,16 @@ public void emptyConsumerToString() {
assertEquals("EmptyConsumer", Functions.EMPTY_CONSUMER.toString());
}
+ @Test
+ public void errorConsumerEmpty() throws Exception {
+ List errors = TestCommonHelper.trackPluginErrors();
+ try {
+ Functions.ERROR_CONSUMER.accept(new TestException());
+
+ TestCommonHelper.assertUndeliverable(errors, 0, TestException.class);
+ assertEquals(errors.toString(), 1, errors.size());
+ } finally {
+ RxJavaCommonPlugins.reset();
+ }
+ }
}
diff --git a/common/src/test/java/io/reactivex/common/internal/schedulers/SingleSchedulerTest.java b/common/src/test/java/io/reactivex/common/internal/schedulers/SingleSchedulerTest.java
index f213a17..49f575c 100644
--- a/common/src/test/java/io/reactivex/common/internal/schedulers/SingleSchedulerTest.java
+++ b/common/src/test/java/io/reactivex/common/internal/schedulers/SingleSchedulerTest.java
@@ -15,12 +15,13 @@
import static org.junit.Assert.*;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import org.junit.Test;
import io.reactivex.common.*;
import io.reactivex.common.Scheduler.Worker;
+import io.reactivex.common.internal.disposables.SequentialDisposable;
import io.reactivex.common.internal.functions.Functions;
import io.reactivex.common.internal.schedulers.SingleScheduler.ScheduledWorker;
@@ -115,4 +116,64 @@ public void runnableDisposedAsyncTimed() throws Exception {
Thread.sleep(1);
}
}
+
+ @Test(timeout = 10000)
+ public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
+ Scheduler s = Schedulers.single();
+
+ for (int initial = 0; initial < 2; initial++) {
+ final CountDownLatch cdl = new CountDownLatch(1);
+
+ final SequentialDisposable sd = new SequentialDisposable();
+
+ try {
+ sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
+ int count;
+ @Override
+ public void run() {
+ if (++count == 10) {
+ sd.dispose();
+ cdl.countDown();
+ }
+ }
+ }, initial, 0, TimeUnit.MILLISECONDS));
+
+ assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
+ } finally {
+ sd.dispose();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void schedulePeriodicallyZeroPeriod() throws Exception {
+ Scheduler s = Schedulers.single();
+
+ for (int initial = 0; initial < 2; initial++) {
+
+ final CountDownLatch cdl = new CountDownLatch(1);
+
+ final SequentialDisposable sd = new SequentialDisposable();
+
+ Scheduler.Worker w = s.createWorker();
+
+ try {
+ sd.replace(w.schedulePeriodically(new Runnable() {
+ int count;
+ @Override
+ public void run() {
+ if (++count == 10) {
+ sd.dispose();
+ cdl.countDown();
+ }
+ }
+ }, initial, 0, TimeUnit.MILLISECONDS));
+
+ assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
+ } finally {
+ sd.dispose();
+ w.dispose();
+ }
+ }
+ }
}
diff --git a/common/src/test/java/io/reactivex/common/internal/utils/ExceptionHelperTest.java b/common/src/test/java/io/reactivex/common/internal/utils/ExceptionHelperTest.java
index 032376b..c13ec66 100644
--- a/common/src/test/java/io/reactivex/common/internal/utils/ExceptionHelperTest.java
+++ b/common/src/test/java/io/reactivex/common/internal/utils/ExceptionHelperTest.java
@@ -46,4 +46,9 @@ public void run() {
TestCommonHelper.race(r, r, Schedulers.single());
}
}
+
+ @Test(expected = InternalError.class)
+ public void throwIfThrowable() throws Exception {
+ ExceptionHelper.throwIfThrowable(new InternalError());
+ }
}
diff --git a/common/src/test/java/io/reactivex/common/schedulers/AbstractSchedulerTests.java b/common/src/test/java/io/reactivex/common/schedulers/AbstractSchedulerTests.java
index 9b6ed4a..84bc4d1 100644
--- a/common/src/test/java/io/reactivex/common/schedulers/AbstractSchedulerTests.java
+++ b/common/src/test/java/io/reactivex/common/schedulers/AbstractSchedulerTests.java
@@ -25,6 +25,7 @@
import org.mockito.stubbing.Answer;
import io.reactivex.common.*;
+import io.reactivex.common.internal.disposables.SequentialDisposable;
import io.reactivex.common.internal.schedulers.TrampolineScheduler;
/**
@@ -340,4 +341,71 @@ public void run() {
}
assertTrue(d.isDisposed());
}
+
+ @Test(timeout = 10000)
+ public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
+ Scheduler s = getScheduler();
+ if (s instanceof TrampolineScheduler) {
+ // can't properly stop a trampolined periodic task
+ return;
+ }
+
+ for (int initial = 0; initial < 2; initial++) {
+ final CountDownLatch cdl = new CountDownLatch(1);
+
+ final SequentialDisposable sd = new SequentialDisposable();
+
+ try {
+ sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
+ int count;
+ @Override
+ public void run() {
+ if (++count == 10) {
+ sd.dispose();
+ cdl.countDown();
+ }
+ }
+ }, initial, 0, TimeUnit.MILLISECONDS));
+
+ assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
+ } finally {
+ sd.dispose();
+ }
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void schedulePeriodicallyZeroPeriod() throws Exception {
+ Scheduler s = getScheduler();
+ if (s instanceof TrampolineScheduler) {
+ // can't properly stop a trampolined periodic task
+ return;
+ }
+
+ for (int initial = 0; initial < 2; initial++) {
+ final CountDownLatch cdl = new CountDownLatch(1);
+
+ final SequentialDisposable sd = new SequentialDisposable();
+
+ Scheduler.Worker w = s.createWorker();
+
+ try {
+ sd.replace(w.schedulePeriodically(new Runnable() {
+ int count;
+ @Override
+ public void run() {
+ if (++count == 10) {
+ sd.dispose();
+ cdl.countDown();
+ }
+ }
+ }, initial, 0, TimeUnit.MILLISECONDS));
+
+ assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
+ } finally {
+ sd.dispose();
+ w.dispose();
+ }
+ }
+ }
}
diff --git a/flowable/src/main/java/io/reactivex/flowable/ConnectableFlowable.java b/flowable/src/main/java/io/reactivex/flowable/ConnectableFlowable.java
index cd5af38..fb0badf 100644
--- a/flowable/src/main/java/io/reactivex/flowable/ConnectableFlowable.java
+++ b/flowable/src/main/java/io/reactivex/flowable/ConnectableFlowable.java
@@ -109,7 +109,7 @@ public Flowable autoConnect(int numberOfSubscribers) {
* @param numberOfSubscribers the number of subscribers to await before calling connect
* on the ConnectableObservable. A non-positive value indicates
* an immediate connection.
- * @param connection the callback Action1 that will receive the Subscription representing the
+ * @param connection the callback Consumer that will receive the Subscription representing the
* established connection
* @return an Observable that automatically connects to this ConnectableObservable
* when the specified number of Subscribers subscribe to it and calls the
diff --git a/flowable/src/main/java/io/reactivex/flowable/Flowable.java b/flowable/src/main/java/io/reactivex/flowable/Flowable.java
index 59e80c5..b622b20 100644
--- a/flowable/src/main/java/io/reactivex/flowable/Flowable.java
+++ b/flowable/src/main/java/io/reactivex/flowable/Flowable.java
@@ -145,6 +145,9 @@ public static int bufferSize() {
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided array of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -186,6 +189,9 @@ public static Flowable combineLatest(Publisher extends T>[] sources,
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If there are no source Publishers provided, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -227,6 +233,9 @@ public static Flowable combineLatest(Function super Object[], ? exte
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided array of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -276,6 +285,9 @@ public static Flowable combineLatest(Publisher extends T>[] sources,
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided iterable of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -318,6 +330,9 @@ public static Flowable combineLatest(Iterable extends Publisher ex
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided iterable of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -365,6 +380,9 @@ public static Flowable combineLatest(Iterable extends Publisher ex
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided array of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -408,6 +426,9 @@ public static Flowable combineLatestDelayError(Publisher extends T>[
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided array of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -451,6 +472,9 @@ public static Flowable combineLatestDelayError(Function super Object
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If there are no source Publishers provided, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -496,6 +520,9 @@ public static Flowable combineLatestDelayError(Function super Object
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided array of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -547,6 +574,9 @@ public static Flowable combineLatestDelayError(Publisher extends T>[
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided iterable of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -590,6 +620,9 @@ public static Flowable combineLatestDelayError(Iterable extends Publ
* If any of the sources never produces an item but only terminates (normally or with an error), the
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
* If that input source is also synchronous, other sources after it will not be subscribed to.
+ *
+ * If the provided iterable of source Publishers is empty, the resulting sequence completes immediately without emitting
+ * any items and without any calls to the combiner function.
*
*
* - Backpressure:
@@ -1313,7 +1346,7 @@ public static Flowable concat(
/**
* Concatenates a variable number of Publisher sources.
*
- * Note: named this way because of overload conflict with concat(Publisher<Publisher>).
+ * Note: named this way because of overload conflict with concat(Publisher<Publisher>).
*
*
*
@@ -2083,7 +2116,6 @@ public static Flowable fromPublisher(final Publisher extends T> source)
/**
* Returns a cold, synchronous, stateless and backpressure-aware generator of values.
- *
*
* - Backpressure:
* - The operator honors downstream backpressure.
@@ -2110,7 +2142,6 @@ public static Flowable generate(final Consumer> generator) {
/**
* Returns a cold, synchronous, stateful and backpressure-aware generator of values.
- *
*
* - Backpressure:
* - The operator honors downstream backpressure.
@@ -2138,7 +2169,6 @@ public static Flowable generate(Callable initialState, final BiCons
/**
* Returns a cold, synchronous, stateful and backpressure-aware generator of values.
- *
*
* - Backpressure:
* - The operator honors downstream backpressure.
@@ -2168,7 +2198,6 @@ public static Flowable generate(Callable initialState, final BiCons
/**
* Returns a cold, synchronous, stateful and backpressure-aware generator of values.
- *
*
* - Backpressure:
* - The operator honors downstream backpressure.
@@ -2195,7 +2224,6 @@ public static Flowable generate(Callable initialState, BiFunction
*
* - Backpressure:
* - The operator honors downstream backpressure.
@@ -4168,7 +4196,7 @@ public static Flowable using(Callable extends D> resourceSupplier,
*
*
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4221,7 +4249,7 @@ public static Flowable zip(Iterable extends Publisher extends T>>
*
*
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4276,7 +4304,7 @@ public static Flowable zip(Publisher extends Publisher extends T>>
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4336,7 +4364,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4398,7 +4426,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4461,7 +4489,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4526,7 +4554,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4596,7 +4624,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4669,7 +4697,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4746,7 +4774,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4828,7 +4856,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -4914,7 +4942,7 @@ public static Flowable zip(
* use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion
* or cancellation.
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -5005,7 +5033,7 @@ public static Flowable zip(
*
*
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -5066,7 +5094,7 @@ public static Flowable zipArray(Function super Object[], ? extends R
*
*
*
- * - Backpressure:
-
+ *
- Backpressure:
* - The operator expects backpressure from the sources and honors backpressure from the downstream.
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
@@ -5579,7 +5607,6 @@ public final void blockingSubscribe() {
* If the Flowable emits an error, it is wrapped into an
* {@link io.reactivex.common.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaFlowablePlugins.onError handler.
- *
*
* - Backpressure:
* - The operator consumes the source {@code Flowable} in an unbounded manner
@@ -5638,7 +5665,6 @@ public final void blockingSubscribe(Consumer super T> onNext, Consumer super
/**
* Subscribes to the source and calls the Subscriber methods on the current thread.
- *
*
* - Backpressure:
* - The supplied {@code Subscriber} determines how backpressure is applied.
@@ -6871,8 +6897,6 @@ public final Flowable concatMapEagerDelayError(Function super T, ? exte
/**
* Returns a Flowable that concatenate each item emitted by the source Publisher with the values in an
* Iterable corresponding to that item that is generated by a selector.
- *
- *
*
* - Backpressure:
* - The operator honors backpressure from downstream. The source {@code Publisher}s is
@@ -6901,8 +6925,6 @@ public final Flowable concatMapIterable(Function super T, ? extends Ite
/**
* Returns a Flowable that concatenate each item emitted by the source Publisher with the values in an
* Iterable corresponding to that item that is generated by a selector.
- *
- *
*
* - Backpressure:
* - The operator honors backpressure from downstream. The source {@code Publisher}s is
@@ -7053,7 +7075,6 @@ public final Flowable debounce(Function super T, ? extends Publisher
*
*
* Information on debounce vs throttle:
- *
*