Skip to content

Commit

Permalink
Merge pull request #2471 from jnlopar/fix1702
Browse files Browse the repository at this point in the history
Fixes NPEs reported in #1702 by synchronizing queue.
  • Loading branch information
akarnokd committed Jan 23, 2015
2 parents cf5ae70 + 63da8b1 commit 4fbf11a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 17 deletions.
29 changes: 12 additions & 17 deletions src/main/java/rx/schedulers/TrampolineScheduler.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2014 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,7 +15,7 @@
*/
package rx.schedulers;

import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -45,12 +45,11 @@ public Worker createWorker() {
/* package accessible for unit tests */TrampolineScheduler() {
}

volatile int counter;
static final AtomicIntegerFieldUpdater<TrampolineScheduler> COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TrampolineScheduler.class, "counter");
private static class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {

private class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {

final PriorityQueue<TimedAction> queue = new PriorityQueue<TimedAction>();
private static final AtomicIntegerFieldUpdater COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerCurrentThreadScheduler.class, "counter");
volatile int counter;
private final PriorityBlockingQueue<TimedAction> queue = new PriorityBlockingQueue<TimedAction>();
private final BooleanSubscription innerSubscription = new BooleanSubscription();
private final AtomicInteger wip = new AtomicInteger();

Expand All @@ -70,13 +69,12 @@ private Subscription enqueue(Action0 action, long execTime) {
if (innerSubscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(this));
queue.add(timedAction);

if (wip.getAndIncrement() == 0) {
do {
TimedAction polled = queue.poll();
// check for null as it could have been unsubscribed and removed
final TimedAction polled = queue.poll();
if (polled != null) {
polled.action.call();
}
Expand All @@ -88,10 +86,7 @@ private Subscription enqueue(Action0 action, long execTime) {

@Override
public void call() {
PriorityQueue<TimedAction> _q = queue;
if (_q != null) {
_q.remove(timedAction);
}
queue.remove(timedAction);
}

});
Expand Down Expand Up @@ -130,7 +125,7 @@ public int compareTo(TimedAction that) {
return result;
}
}

// because I can't use Integer.compare from Java 7
private static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
Expand Down
45 changes: 45 additions & 0 deletions src/test/java/rx/schedulers/TrampolineSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import rx.*;
import rx.Observer;
import rx.Scheduler.Worker;
import rx.Observable;
import rx.functions.*;
import rx.observers.Observers;
import rx.observers.TestSubscriber;
import rx.subscriptions.CompositeSubscription;

public class TrampolineSchedulerTest extends AbstractSchedulerTests {
Expand Down Expand Up @@ -95,6 +99,47 @@ public void call() {
}
}

/**
* This is a regression test for #1702. Concurrent work scheduling that is improperly synchronized can cause an
* action to be added or removed onto the priority queue during a poll, which can result in NPEs during queue
* sifting. While it is difficult to isolate the issue directly, we can easily trigger the behavior by spamming the
* trampoline with enqueue requests from multiple threads concurrently.
*/
@Test
public void testTrampolineWorkerHandlesConcurrentScheduling() {
final Worker trampolineWorker = Schedulers.trampoline().createWorker();
final Observer<Subscription> observer = Observers.empty();
final TestSubscriber<Subscription> ts = new TestSubscriber<Subscription>(observer);

// Spam the trampoline with actions.
Observable.range(0, 50)
.flatMap(new Func1<Integer, Observable<Subscription>>() {

@Override
public Observable<Subscription> call(Integer count) {
return Observable.interval(1, TimeUnit.MICROSECONDS).map(
new Func1<Long, Subscription>() {

@Override
public Subscription call(Long count) {
return trampolineWorker.schedule(new Action0() {

@Override
public void call() {}

});
}

}).limit(100);
}

})
.subscribeOn(Schedulers.computation())
.subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
}

private static Worker doWorkOnNewTrampoline(final String key, final ArrayList<String> workDone) {
Worker worker = Schedulers.trampoline().createWorker();
worker.schedule(new Action0() {
Expand Down

0 comments on commit 4fbf11a

Please sign in to comment.