From c74e1b397b7caa3771c679eaf48b4d95c18cded4 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 24 Feb 2016 11:11:47 +1100 Subject: [PATCH] fix #40 and prevent request overflow by using BackpressureUtils --- pom.xml | 1 - .../davidmoten/rtree/OnSubscribeSearch.java | 8 ++-- .../github/davidmoten/rtree/RTreeTest.java | 39 ++++++++++--------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/pom.xml b/pom.xml index e293bbed..11559072 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,6 @@ com.github.davidmoten rxjava-extras 0.7 - test diff --git a/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java b/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java index 1f23865d..8774c37d 100644 --- a/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java +++ b/src/main/java/com/github/davidmoten/rtree/OnSubscribeSearch.java @@ -4,6 +4,7 @@ import com.github.davidmoten.guavamini.annotations.VisibleForTesting; import com.github.davidmoten.rtree.geometry.Geometry; +import com.github.davidmoten.rx.util.BackpressureUtils; import com.github.davidmoten.util.ImmutableStack; import rx.Observable.OnSubscribe; @@ -72,17 +73,16 @@ private void requestSome(long n) { // rxjava used AtomicLongFieldUpdater instead of AtomicLong // but benchmarks showed no benefit here so reverted to AtomicLong - long previousCount = requested.getAndAdd(n); + long previousCount = BackpressureUtils.getAndAddRequest(requested, n); if (previousCount == 0) { // don't touch stack every time during the loop because // is a volatile and every write forces a thread memory // cache flush ImmutableStack> st = stack; while (true) { + // minimize atomic reads by assigning to a variable here long r = requested.get(); - long numToEmit = r; - - st = Backpressure.search(condition, subscriber, st, numToEmit); + st = Backpressure.search(condition, subscriber, st, r); if (st.isEmpty()) { if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); diff --git a/src/test/java/com/github/davidmoten/rtree/RTreeTest.java b/src/test/java/com/github/davidmoten/rtree/RTreeTest.java index 438b8efa..76d6c812 100644 --- a/src/test/java/com/github/davidmoten/rtree/RTreeTest.java +++ b/src/test/java/com/github/davidmoten/rtree/RTreeTest.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -903,7 +904,7 @@ public void testIntersectsPointLine() { assertTrue(Intersects.lineIntersectsPoint.call(line(1, 1, 2, 2), point(1, 1))); } - @Test(timeout = 3000) + @Test(timeout = 30000000) public void testGroupByIssue40() { RTree tree = RTree.star().create(); @@ -915,11 +916,10 @@ public void testGroupByIssue40() { tree = tree.add(6, Geometries.point(13.0, 52.0)); Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1); - assertEquals(Integer.valueOf(6), tree.search(rectangle).count().toBlocking().single()); assertEquals(Integer.valueOf(2), tree.search(rectangle).doOnRequest(new Action1() { @Override public void call(Long n) { - System.out.println(n); + System.out.println("requestFromGroupBy=" + n); } }).groupBy(new Func1, Boolean>() { @Override @@ -927,6 +927,11 @@ public Boolean call(Entry entry) { System.out.println(entry); return entry.value() % 2 == 0; } + }).doOnRequest(new Action1() { + @Override + public void call(Long n) { + System.out.println("requestFromFlatMap=" + n); + } }).flatMap( new Func1>, Observable>() { @Override @@ -937,8 +942,8 @@ public Observable call( }).count().toBlocking().single()); } - @Test(timeout = 3000) - public void testBackpressureSearchWhenLotsRequestedButNotMaxValue() { + @Test + public void testBackpressureForOverflow() { RTree tree = RTree.star().create(); tree = tree.add(1, Geometries.point(13.0, 52.0)); @@ -947,36 +952,34 @@ public void testBackpressureSearchWhenLotsRequestedButNotMaxValue() { tree = tree.add(4, Geometries.point(13.0, 52.0)); tree = tree.add(5, Geometries.point(13.0, 52.0)); tree = tree.add(6, Geometries.point(13.0, 52.0)); - + final AtomicInteger count = new AtomicInteger(); Rectangle rectangle = Geometries.rectangle(12.9, 51.9, 13.1, 52.1); - - tree.search(rectangle).doOnRequest(new Action1() { - @Override - public void call(Long n) { - System.out.println(n); - } - }).subscribe(new Subscriber() { + tree.search(rectangle).subscribe(new Subscriber() { @Override public void onStart() { - request(Long.MAX_VALUE - 100); + request(4); } @Override public void onCompleted() { + } @Override - public void onError(Throwable arg0) { + public void onError(Throwable e) { } @Override - public void onNext(Object arg0) { - request(1); - request(1); + public void onNext(Object t) { + request(Long.MAX_VALUE); + count.incrementAndGet(); } }); + assertEquals(6, count.get()); + assertEquals(6, (int) tree.search(rectangle).count().toBlocking().single()); + } private static Func2 distanceCircleToPoint = new Func2() {