diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 326de38471..a94fd047ae 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -51,8 +51,36 @@ public CompositeSubscription(Subscription... subscriptions) { } } + /** + * Remove and unsubscribe all subscriptions but do not unsubscribe the outer CompositeSubscription. + */ + public void clear() { + Collection es = null; + for (Subscription s : subscriptions.keySet()) { + try { + s.unsubscribe(); + this.subscriptions.remove(s); + } catch (Throwable e) { + if (es == null) { + es = new ArrayList(); + } + es.add(e); + } + } + if (es != null) { + throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es); + } + } + + /** + * Remove the {@link Subscription} and unsubscribe it. + * + * @param s + */ public void remove(Subscription s) { this.subscriptions.remove(s); + // also unsubscribe from it: http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx + s.unsubscribe(); } public boolean isUnsubscribed() { diff --git a/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java b/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java index ce23bf1d68..551a6b2f66 100644 --- a/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java +++ b/rxjava-core/src/test/java/rx/subscriptions/CompositeSubscriptionTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -82,4 +82,66 @@ public void unsubscribe() { // we should still have unsubscribed to the second one assertEquals(1, counter.get()); } + + @Test + public void testRemoveUnsubscribes() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + CompositeSubscription s = new CompositeSubscription(); + s.add(s1); + s.add(s2); + + s.remove(s1); + + assertTrue(s1.isUnsubscribed()); + assertFalse(s2.isUnsubscribed()); + } + + @Test + public void testClear() { + BooleanSubscription s1 = new BooleanSubscription(); + BooleanSubscription s2 = new BooleanSubscription(); + + CompositeSubscription s = new CompositeSubscription(); + s.add(s1); + s.add(s2); + + assertFalse(s1.isUnsubscribed()); + assertFalse(s2.isUnsubscribed()); + + s.clear(); + + assertTrue(s1.isUnsubscribed()); + assertTrue(s1.isUnsubscribed()); + assertFalse(s.isUnsubscribed()); + + BooleanSubscription s3 = new BooleanSubscription(); + + s.add(s3); + s.unsubscribe(); + + assertTrue(s3.isUnsubscribed()); + assertTrue(s.isUnsubscribed()); + } + + @Test + public void testUnsubscribeIdempotence() { + final AtomicInteger counter = new AtomicInteger(); + CompositeSubscription s = new CompositeSubscription(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + }); + + s.unsubscribe(); + s.unsubscribe(); + s.unsubscribe(); + + // we should have only unsubscribed once + assertEquals(1, counter.get()); + } }