diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 75240e47e7..17128709db 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -15,15 +15,14 @@ */ package rx.internal.schedulers; -import rx.Scheduler; -import rx.Subscription; +import java.lang.reflect.Method; +import java.util.concurrent.*; + +import rx.*; import rx.functions.Action0; -import rx.plugins.RxJavaPlugins; -import rx.plugins.RxJavaSchedulersHook; +import rx.plugins.*; import rx.subscriptions.Subscriptions; -import java.util.concurrent.*; - /** * @warn class description missing */ @@ -35,6 +34,19 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription { /* package */ public NewThreadWorker(ThreadFactory threadFactory) { executor = Executors.newScheduledThreadPool(1, threadFactory); + // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak + for (Method m : executor.getClass().getMethods()) { + if (m.getName().equals("setRemoveOnCancelPolicy") + && m.getParameterCount() == 1 + && m.getParameters()[0].getType() == Boolean.TYPE) { + try { + m.invoke(executor, true); + } catch (Exception ex) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); + } + break; + } + } schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); } diff --git a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java b/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java index e385e3828d..4b6b02eff2 100644 --- a/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java +++ b/src/test/java/rx/schedulers/CachedThreadSchedulerTest.java @@ -16,12 +16,16 @@ package rx.schedulers; +import java.lang.management.*; +import java.util.concurrent.*; + +import junit.framework.Assert; + import org.junit.Test; + import rx.Observable; import rx.Scheduler; -import rx.functions.Action1; -import rx.functions.Func1; - +import rx.functions.*; import static org.junit.Assert.assertTrue; public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -66,4 +70,59 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException { SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } + + @Test(timeout = 30000) + public void testCancelledTaskRetention() throws InterruptedException { + try { + ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE); + + System.out.println("Wait before GC"); + Thread.sleep(1000); + + System.out.println("GC"); + System.gc(); + + Thread.sleep(1000); + + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + Scheduler.Worker w = Schedulers.io().createWorker(); + for (int i = 0; i < 750000; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedule(Actions.empty(), 1, TimeUnit.DAYS); + } + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long after = memHeap.getUsed(); + System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); + + w.unsubscribe(); + + System.out.println("Wait before second GC"); + Thread.sleep(1000); + + System.out.println("Second GC"); + System.gc(); + + Thread.sleep(1000); + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long finish = memHeap.getUsed(); + System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); + + if (finish > initial * 5) { + Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); + } + } catch (NoSuchMethodException ex) { + // not supported, no reason to test for it + } + } + } \ No newline at end of file