Skip to content

Commit

Permalink
2.x: fix periodic scheduling with negative period causing IAE (#5419)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 17, 2017
1 parent f64fdd9 commit b7086ef
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.internal.schedulers;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.functions.Functions;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Wrapper for a regular task that gets immediately rescheduled when the task completed.
*/
final class InstantPeriodicTask implements Callable<Void>, Disposable {

final Runnable task;

final AtomicReference<Future<?>> rest;

final AtomicReference<Future<?>> first;

final ExecutorService executor;

Thread runner;

static final FutureTask<Void> CANCELLED = new FutureTask<Void>(Functions.EMPTY_RUNNABLE, null);

InstantPeriodicTask(Runnable task, ExecutorService executor) {
super();
this.task = task;
this.first = new AtomicReference<Future<?>>();
this.rest = new AtomicReference<Future<?>>();
this.executor = executor;
}

@Override
public Void call() throws Exception {
try {
runner = Thread.currentThread();
try {
task.run();
setRest(executor.submit(this));
} catch (Throwable ex) {
RxJavaPlugins.onError(ex);
}
} finally {
runner = null;
}
return null;
}

@Override
public void dispose() {
Future<?> current = first.getAndSet(CANCELLED);
if (current != null && current != CANCELLED) {
current.cancel(runner != Thread.currentThread());
}
current = rest.getAndSet(CANCELLED);
if (current != null && current != CANCELLED) {
current.cancel(runner != Thread.currentThread());
}
}

@Override
public boolean isDisposed() {
return first.get() == CANCELLED;
}

void setFirst(Future<?> f) {
for (;;) {
Future<?> current = first.get();
if (current == CANCELLED) {
f.cancel(runner != Thread.currentThread());
}
if (first.compareAndSet(current, f)) {
return;
}
}
}

void setRest(Future<?> f) {
for (;;) {
Future<?> current = rest.get();
if (current == CANCELLED) {
f.cancel(runner != Thread.currentThread());
}
if (rest.compareAndSet(current, f)) {
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,27 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un
* @param unit the time unit for both the initialDelay and period
* @return the ScheduledRunnable instance
*/
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run));
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (period <= 0L) {

InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
try {
Future<?> f;
if (initialDelay <= 0L) {
f = executor.submit(periodicWrapper);
} else {
f = executor.schedule(periodicWrapper, initialDelay, unit);
}
periodicWrapper.setFirst(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}

return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,28 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit uni
@NonNull
@Override
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run));
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (period <= 0L) {

ScheduledExecutorService exec = executor.get();

InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, exec);
Future<?> f;
try {
if (initialDelay <= 0L) {
f = exec.submit(periodicWrapper);
} else {
f = exec.schedule(periodicWrapper, initialDelay, unit);
}
periodicWrapper.setFirst(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}

return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
Future<?> f = executor.get().scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.reactivex.*;
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.*;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.schedulers.SingleScheduler.ScheduledWorker;
import io.reactivex.schedulers.Schedulers;
Expand Down Expand Up @@ -117,4 +118,64 @@ public void runnableDisposedAsyncTimed() throws Exception {
Thread.sleep(1);
}
}

@Test(timeout = 10000)
public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
Scheduler s = Schedulers.single();

for (int initial = 0; initial < 2; initial++) {
final CountDownLatch cdl = new CountDownLatch(1);

final SequentialDisposable sd = new SequentialDisposable();

try {
sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
int count;
@Override
public void run() {
if (++count == 10) {
sd.dispose();
cdl.countDown();
}
}
}, initial, 0, TimeUnit.MILLISECONDS));

assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
} finally {
sd.dispose();
}
}
}

@Test(timeout = 10000)
public void schedulePeriodicallyZeroPeriod() throws Exception {
Scheduler s = Schedulers.single();

for (int initial = 0; initial < 2; initial++) {

final CountDownLatch cdl = new CountDownLatch(1);

final SequentialDisposable sd = new SequentialDisposable();

Scheduler.Worker w = s.createWorker();

try {
sd.replace(w.schedulePeriodically(new Runnable() {
int count;
@Override
public void run() {
if (++count == 10) {
sd.dispose();
cdl.countDown();
}
}
}, initial, 0, TimeUnit.MILLISECONDS));

assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
} finally {
sd.dispose();
w.dispose();
}
}
}
}
68 changes: 68 additions & 0 deletions src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.*;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.schedulers.TrampolineScheduler;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.subscribers.DefaultSubscriber;
Expand Down Expand Up @@ -558,4 +559,71 @@ public void run() {
}
assertTrue(d.isDisposed());
}

@Test(timeout = 10000)
public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
Scheduler s = getScheduler();
if (s instanceof TrampolineScheduler) {
// can't properly stop a trampolined periodic task
return;
}

for (int initial = 0; initial < 2; initial++) {
final CountDownLatch cdl = new CountDownLatch(1);

final SequentialDisposable sd = new SequentialDisposable();

try {
sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
int count;
@Override
public void run() {
if (++count == 10) {
sd.dispose();
cdl.countDown();
}
}
}, initial, 0, TimeUnit.MILLISECONDS));

assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
} finally {
sd.dispose();
}
}
}

@Test(timeout = 10000)
public void schedulePeriodicallyZeroPeriod() throws Exception {
Scheduler s = getScheduler();
if (s instanceof TrampolineScheduler) {
// can't properly stop a trampolined periodic task
return;
}

for (int initial = 0; initial < 2; initial++) {
final CountDownLatch cdl = new CountDownLatch(1);

final SequentialDisposable sd = new SequentialDisposable();

Scheduler.Worker w = s.createWorker();

try {
sd.replace(w.schedulePeriodically(new Runnable() {
int count;
@Override
public void run() {
if (++count == 10) {
sd.dispose();
cdl.countDown();
}
}
}, initial, 0, TimeUnit.MILLISECONDS));

assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
} finally {
sd.dispose();
w.dispose();
}
}
}
}

0 comments on commit b7086ef

Please sign in to comment.