Skip to content

Commit

Permalink
Feature/#4876 nonnull annotations (#5051)
Browse files Browse the repository at this point in the history
* add @nullable annotations to RxJavaPlugins

* add @nonnull annotations to schedulers

* javadoc for NonNull/Nullable annotations
  • Loading branch information
jschneider authored and akarnokd committed Feb 3, 2017
1 parent 0b51d41 commit 8720828
Show file tree
Hide file tree
Showing 24 changed files with 349 additions and 132 deletions.
39 changes: 25 additions & 14 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
Expand Down Expand Up @@ -61,6 +62,7 @@ public static long clockDriftTolerance() {
*
* @return a Worker representing a serial queue of actions to be executed
*/
@NonNull
public abstract Worker createWorker();

/**
Expand All @@ -69,7 +71,7 @@ public static long clockDriftTolerance() {
* @return the 'current time'
* @since 2.0
*/
public long now(TimeUnit unit) {
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -105,7 +107,8 @@ public void shutdown() {
* @return the Disposable instance that let's one cancel this particular task.
* @since 2.0
*/
public Disposable scheduleDirect(Runnable run) {
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

Expand All @@ -122,7 +125,8 @@ public Disposable scheduleDirect(Runnable run) {
* @return the Disposable that let's one cancel this particular delayed task.
* @since 2.0
*/
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
Expand Down Expand Up @@ -159,7 +163,8 @@ public void run() {
* @return the Disposable that let's one cancel this particular delayed task.
* @since 2.0
*/
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
Expand Down Expand Up @@ -249,7 +254,8 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
*/
@SuppressWarnings("unchecked")
@Experimental
public <S extends Scheduler & Disposable> S when(Function<Flowable<Flowable<Completable>>, Completable> combine) {
@NonNull
public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}

Expand All @@ -268,7 +274,8 @@ public abstract static class Worker implements Disposable {
* Runnable to schedule
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
public Disposable schedule(Runnable run) {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}

Expand All @@ -287,7 +294,8 @@ public Disposable schedule(Runnable run) {
* the time unit of {@code delayTime}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
Expand All @@ -309,7 +317,8 @@ public Disposable schedule(Runnable run) {
* the time unit of {@code period}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
public Disposable schedulePeriodically(Runnable run, final long initialDelay, final long period, final TimeUnit unit) {
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();

final SequentialDisposable sd = new SequentialDisposable(first);
Expand Down Expand Up @@ -337,7 +346,7 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi
* @return the 'current time'
* @since 2.0
*/
public long now(TimeUnit unit) {
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

Expand All @@ -346,15 +355,17 @@ public long now(TimeUnit unit) {
* of this task has to happen (accounting for clock drifts).
*/
final class PeriodicTask implements Runnable {
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;

PeriodicTask(long firstStartInNanoseconds, Runnable decoratedRun,
long firstNowNanoseconds, SequentialDisposable sd, long periodInNanoseconds) {
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
this.decoratedRun = decoratedRun;
this.sd = sd;
this.periodInNanoseconds = periodInNanoseconds;
Expand Down Expand Up @@ -395,12 +406,12 @@ public void run() {
static class PeriodicDirectTask
implements Runnable, Disposable {
final Runnable run;

@NonNull
final Worker worker;

@NonNull
volatile boolean disposed;

PeriodicDirectTask(Runnable run, Worker worker) {
PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
this.run = run;
this.worker = worker;
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/annotations/NonNull.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.CLASS;

/**
* Indicates that a field/parameter/variable/return type is never null.
*/
@Documented
@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE})
@Retention(value = CLASS)
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/io/reactivex/annotations/Nullable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.LOCAL_VARIABLE;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.CLASS;

/**
* Indicates that a field/parameter/variable/return type may be null.
*/
@Documented
@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE})
@Retention(value = CLASS)
public @interface Nullable { }

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.reactivex.internal.schedulers;

import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;

Expand Down Expand Up @@ -118,19 +119,22 @@ public ComputationScheduler(ThreadFactory threadFactory) {
start();
}

@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.scheduleDirect(run, delay, unit);
}

@NonNull
@Override
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
Expand Down Expand Up @@ -188,16 +192,18 @@ public boolean isDisposed() {
return disposed;
}

@NonNull
@Override
public Disposable schedule(Runnable action) {
public Disposable schedule(@NonNull Runnable action) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}

return poolWorker.scheduleActual(action, 0, null, serial);
}
@NonNull
@Override
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.atomic.*;

import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.queue.MpscLinkedQueue;
Expand All @@ -29,21 +30,24 @@
*/
public final class ExecutorScheduler extends Scheduler {

@NonNull
final Executor executor;

static final Scheduler HELPER = Schedulers.single();

public ExecutorScheduler(Executor executor) {
public ExecutorScheduler(@NonNull Executor executor) {
this.executor = executor;
}

@NonNull
@Override
public Worker createWorker() {
return new ExecutorWorker(executor);
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run) {
public Disposable scheduleDirect(@NonNull Runnable run) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
if (executor instanceof ExecutorService) {
Expand All @@ -60,8 +64,9 @@ public Disposable scheduleDirect(Runnable run) {
}
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run, final long delay, final TimeUnit unit) {
public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (executor instanceof ScheduledExecutorService) {
try {
Expand All @@ -87,8 +92,9 @@ public void run() {
return dr;
}

@NonNull
@Override
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Expand Down Expand Up @@ -118,8 +124,9 @@ public ExecutorWorker(Executor executor) {
this.queue = new MpscLinkedQueue<Runnable>();
}

@NonNull
@Override
public Disposable schedule(Runnable run) {
public Disposable schedule(@NonNull Runnable run) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Expand All @@ -143,8 +150,9 @@ public Disposable schedule(Runnable run) {
return br;
}

@NonNull
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
if (delay <= 0) {
return schedule(run);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.*;

/**
Expand Down Expand Up @@ -45,22 +46,26 @@ private ImmediateThinScheduler() {
// singleton class
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run) {
public Disposable scheduleDirect(@NonNull Runnable run) {
run.run();
return DISPOSED;
}

@NonNull
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
}

@NonNull
@Override
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
}

@NonNull
@Override
public Worker createWorker() {
return WORKER;
Expand All @@ -78,19 +83,22 @@ public boolean isDisposed() {
return false; // dispose() has no effect
}

@NonNull
@Override
public Disposable schedule(Runnable run) {
public Disposable schedule(@NonNull Runnable run) {
run.run();
return DISPOSED;
}

@NonNull
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support delayed execution");
}

@NonNull
@Override
public Disposable schedulePeriodically(Runnable run, long initialDelay, long period, TimeUnit unit) {
public Disposable schedulePeriodically(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
throw new UnsupportedOperationException("This scheduler doesn't support periodic execution");
}
}
Expand Down
Loading

0 comments on commit 8720828

Please sign in to comment.