diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 95abc2871f..7d4d7d2e7a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3425,6 +3425,19 @@ public final Observable cache() { return create(new OnSubscribeCache(this)); } + /** + * {@code cache} with initial capacity. + * + * @param capacity + * initial cache size + * @return an Observable that, when first subscribed to, caches all of its items and notifications for the + * benefit of subsequent subscribers + * @see RxJava Wiki: cache() + */ + public final Observable cache(int capacity) { + return create(new OnSubscribeCache(this, capacity)); + } + /** * Returns an Observable that emits the items emitted by the source Observable, converted to the specified * type. diff --git a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeCache.java b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeCache.java index d9ffa1936e..b053dcafd4 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeCache.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OnSubscribeCache.java @@ -52,6 +52,10 @@ public OnSubscribeCache(Observable source) { this(source, ReplaySubject. create()); } + public OnSubscribeCache(Observable source, int capacity) { + this(source, ReplaySubject. create(capacity)); + } + /* accessible to tests */OnSubscribeCache(Observable source, Subject cache) { this.source = source; this.cache = cache; diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index a983d7fc8b..c4da2bd270 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -694,6 +694,54 @@ public void call(String v) { assertEquals(1, counter.get()); } + @Test + public void testCacheWithCapacity() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new OnSubscribe() { + + @Override + public void call(final Subscriber observer) { + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + } + }).cache(1); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + /** * https://github.com/Netflix/RxJava/issues/198 *