From 71b5b85e1d219b706702356108fe1a9b2fbb15c1 Mon Sep 17 00:00:00 2001 From: clm Date: Fri, 14 Jul 2017 14:05:50 -0700 Subject: [PATCH] Add submitAsync and scheduleAsync methods, to ease the deprecation of Futures.dereference [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=162004818 --- .../common/util/concurrent/FuturesTest.java | 244 +++++++++++++++++- .../common/util/concurrent/Futures.java | 38 +++ .../TrustedListenableFutureTask.java | 58 ++++- .../util/concurrent/FuturesTest_gwt.java | 54 ++++ .../common/util/concurrent/FuturesTest.java | 244 +++++++++++++++++- .../common/util/concurrent/Futures.java | 38 +++ .../TrustedListenableFutureTask.java | 58 ++++- 7 files changed, 694 insertions(+), 40 deletions(-) diff --git a/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java b/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java index 812fc71f4626..2539212ad41d 100644 --- a/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java +++ b/android/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java @@ -38,6 +38,8 @@ import static com.google.common.util.concurrent.Futures.lazyTransform; import static com.google.common.util.concurrent.Futures.makeChecked; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; +import static com.google.common.util.concurrent.Futures.scheduleAsync; +import static com.google.common.util.concurrent.Futures.submitAsync; import static com.google.common.util.concurrent.Futures.successfulAsList; import static com.google.common.util.concurrent.Futures.transform; import static com.google.common.util.concurrent.Futures.transformAsync; @@ -51,6 +53,7 @@ import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -79,6 +82,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1727,15 +1731,13 @@ public ListenableFuture apply(String input) throws Exception { // Pause the executor. final CountDownLatch beforeFunction = new CountDownLatch(1); - @SuppressWarnings("unused") // go/futurereturn-lsc - Future possiblyIgnoredError = - executor.submit( - new Runnable() { - @Override - public void run() { - awaitUninterruptibly(beforeFunction); - } - }); + executor.execute( + new Runnable() { + @Override + public void run() { + awaitUninterruptibly(beforeFunction); + } + }); // Cancel the future after making input available. inputFuture.set("value"); @@ -1743,11 +1745,233 @@ public void run() { // Unpause the executor. beforeFunction.countDown(); - executor.awaitTermination(5, SECONDS); + executor.shutdown(); + assertTrue(executor.awaitTermination(5, SECONDS)); assertFalse(functionCalled.get()); } + public void testSubmitAsync_asyncCallable_error() throws InterruptedException { + final Error error = new Error("deliberate"); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + throw error; + } + }; + SettableFuture inputFuture = SettableFuture.create(); + ListenableFuture outputFuture = submitAsync(callable, directExecutor()); + inputFuture.set("value"); + try { + getDone(outputFuture); + fail(); + } catch (ExecutionException expected) { + assertSame(error, expected.getCause()); + } + } + + public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception { + ListenableFuture chainedFuture = submitAsync(constantAsyncCallable(null), directExecutor()); + try { + getDone(chainedFuture); + fail(); + } catch (ExecutionException expected) { + NullPointerException cause = (NullPointerException) expected.getCause(); + assertThat(cause) + .hasMessage( + "AsyncCallable.call returned null instead of a Future. " + + "Did you mean to return immediateFuture(null)?"); + } + } + + @GwtIncompatible // threads + + public void testSubmitAsync_asyncCallable_cancelledWhileApplyingFunction() + throws InterruptedException, ExecutionException { + final CountDownLatch inFunction = new CountDownLatch(1); + final CountDownLatch callableDone = new CountDownLatch(1); + final SettableFuture resultFuture = SettableFuture.create(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() throws InterruptedException { + inFunction.countDown(); + callableDone.await(); + return resultFuture; + } + }; + SettableFuture inputFuture = SettableFuture.create(); + ListenableFuture future = submitAsync(callable, newSingleThreadExecutor()); + inputFuture.set("value"); + inFunction.await(); + future.cancel(false); + callableDone.countDown(); + try { + future.get(); + fail(); + } catch (CancellationException expected) { + } + try { + resultFuture.get(); + fail(); + } catch (CancellationException expected) { + } + } + + @GwtIncompatible // threads + + public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction() + throws InterruptedException { + final AtomicBoolean callableCalled = new AtomicBoolean(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + callableCalled.set(true); + return immediateFuture(1); + } + }; + ExecutorService executor = newSingleThreadExecutor(); + // Pause the executor. + final CountDownLatch beforeFunction = new CountDownLatch(1); + executor.execute( + new Runnable() { + @Override + public void run() { + awaitUninterruptibly(beforeFunction); + } + }); + ListenableFuture future = submitAsync(callable, executor); + future.cancel(false); + + // Unpause the executor. + beforeFunction.countDown(); + executor.shutdown(); + assertTrue(executor.awaitTermination(5, SECONDS)); + + assertFalse(callableCalled.get()); + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_error() throws InterruptedException { + final Error error = new Error("deliberate"); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + throw error; + } + }; + SettableFuture inputFuture = SettableFuture.create(); + ListenableFuture outputFuture = submitAsync(callable, directExecutor()); + inputFuture.set("value"); + try { + getDone(outputFuture); + fail(); + } catch (ExecutionException expected) { + assertSame(error, expected.getCause()); + } + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_nullInsteadOfFuture() throws Exception { + ListenableFuture chainedFuture = + scheduleAsync( + constantAsyncCallable(null), + 1, + TimeUnit.NANOSECONDS, + newSingleThreadScheduledExecutor()); + try { + chainedFuture.get(); + fail(); + } catch (ExecutionException expected) { + NullPointerException cause = (NullPointerException) expected.getCause(); + assertThat(cause) + .hasMessage( + "AsyncCallable.call returned null instead of a Future. " + + "Did you mean to return immediateFuture(null)?"); + } + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_cancelledWhileApplyingFunction() + throws InterruptedException, ExecutionException { + final CountDownLatch inFunction = new CountDownLatch(1); + final CountDownLatch callableDone = new CountDownLatch(1); + final SettableFuture resultFuture = SettableFuture.create(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() throws InterruptedException { + inFunction.countDown(); + callableDone.await(); + return resultFuture; + } + }; + ListenableFuture future = + scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, newSingleThreadScheduledExecutor()); + inFunction.await(); + future.cancel(false); + callableDone.countDown(); + try { + future.get(); + fail(); + } catch (CancellationException expected) { + } + try { + resultFuture.get(); + fail(); + } catch (CancellationException expected) { + } + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_cancelledBeforeCallingFunction() + throws InterruptedException { + final AtomicBoolean callableCalled = new AtomicBoolean(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + callableCalled.set(true); + return immediateFuture(1); + } + }; + ScheduledExecutorService executor = newSingleThreadScheduledExecutor(); + // Pause the executor. + final CountDownLatch beforeFunction = new CountDownLatch(1); + executor.execute( + new Runnable() { + @Override + public void run() { + awaitUninterruptibly(beforeFunction); + } + }); + ListenableFuture future = scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, executor); + future.cancel(false); + + // Unpause the executor. + beforeFunction.countDown(); + executor.shutdown(); + assertTrue(executor.awaitTermination(5, SECONDS)); + + assertFalse(callableCalled.get()); + } + + private static AsyncCallable constantAsyncCallable(final ListenableFuture returnValue) { + return new AsyncCallable() { + @Override + public ListenableFuture call() { + return returnValue; + } + }; + } + public void testDereference_genericsWildcard() throws Exception { ListenableFuture inner = immediateFuture(null); ListenableFuture> outer = diff --git a/android/guava/src/com/google/common/util/concurrent/Futures.java b/android/guava/src/com/google/common/util/concurrent/Futures.java index 2001b897d66e..42194ea54ff5 100644 --- a/android/guava/src/com/google/common/util/concurrent/Futures.java +++ b/android/guava/src/com/google/common/util/concurrent/Futures.java @@ -240,6 +240,44 @@ public static CheckedFuture immediateFailedChecke return new ImmediateFailedCheckedFuture(exception); } + /** + * Executes {@code callable} on the specified {@code executor}, returning a {@code Future}. + * + * @throws RejectedExecutionException if the task cannot be scheduled for execution + * @since 23.0 + */ + public static ListenableFuture submitAsync(AsyncCallable callable, Executor executor) { + TrustedListenableFutureTask task = TrustedListenableFutureTask.create(callable); + executor.execute(task); + return task; + } + + /** + * Schedules {@code callable} on the specified {@code executor}, returning a {@code Future}. + * + * @throws RejectedExecutionException if the task cannot be scheduled for execution + * @since 23.0 + */ + @GwtIncompatible // java.util.concurrent.ScheduledExecutorService + public static ListenableFuture scheduleAsync( + AsyncCallable callable, + long delay, + TimeUnit timeUnit, + ScheduledExecutorService executorService) { + TrustedListenableFutureTask task = TrustedListenableFutureTask.create(callable); + final Future scheduled = executorService.schedule(task, delay, timeUnit); + task.addListener( + new Runnable() { + @Override + public void run() { + // Don't want to interrupt twice + scheduled.cancel(false); + } + }, + directExecutor()); + return task; + } + /** * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the * primary input fails with the given {@code exceptionType}, from the result provided by the diff --git a/android/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java b/android/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java index e59f8d480330..d0465a966c8e 100644 --- a/android/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java +++ b/android/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java @@ -33,12 +33,10 @@ class TrustedListenableFutureTask extends AbstractFuture.TrustedFuture implements RunnableFuture { - /** - * Creates a {@code ListenableFutureTask} that will upon running, execute the given - * {@code Callable}. - * - * @param callable the callable task - */ + static TrustedListenableFutureTask create(AsyncCallable callable) { + return new TrustedListenableFutureTask(callable); + } + static TrustedListenableFutureTask create(Callable callable) { return new TrustedListenableFutureTask(callable); } @@ -62,15 +60,19 @@ static TrustedListenableFutureTask create(Runnable runnable, @Nullable V * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ - private TrustedFutureInterruptibleTask task; + private InterruptibleTask task; TrustedListenableFutureTask(Callable callable) { this.task = new TrustedFutureInterruptibleTask(callable); } + TrustedListenableFutureTask(AsyncCallable callable) { + this.task = new TrustedFutureInterruptibleAsyncTask(callable); + } + @Override public void run() { - TrustedFutureInterruptibleTask localTask = task; + InterruptibleTask localTask = task; if (localTask != null) { localTask.run(); } @@ -81,7 +83,7 @@ protected void afterDone() { super.afterDone(); if (wasInterrupted()) { - TrustedFutureInterruptibleTask localTask = task; + InterruptibleTask localTask = task; if (localTask != null) { localTask.interruptTask(); } @@ -92,7 +94,7 @@ protected void afterDone() { @Override protected String pendingToString() { - TrustedFutureInterruptibleTask localTask = task; + InterruptibleTask localTask = task; if (localTask != null) { return "task=[" + localTask + "]"; } @@ -129,4 +131,40 @@ public String toString() { return callable.toString(); } } + + @WeakOuter + private final class TrustedFutureInterruptibleAsyncTask extends InterruptibleTask { + private final AsyncCallable callable; + + TrustedFutureInterruptibleAsyncTask(AsyncCallable callable) { + this.callable = checkNotNull(callable); + } + + @Override + void runInterruptibly() { + // Ensure we haven't been cancelled or already run. + if (!isDone()) { + try { + ListenableFuture result = callable.call(); + checkNotNull( + result, + "AsyncCallable.call returned null instead of a Future. " + + "Did you mean to return immediateFuture(null)?"); + setFuture(result); + } catch (Throwable t) { + setException(t); + } + } + } + + @Override + boolean wasInterrupted() { + return TrustedListenableFutureTask.this.wasInterrupted(); + } + + @Override + public String toString() { + return callable.toString(); + } + } } diff --git a/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java b/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java index fb5aa5585b8d..aa13681e9a60 100644 --- a/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java +++ b/guava-gwt/test/com/google/common/util/concurrent/FuturesTest_gwt.java @@ -1989,6 +1989,60 @@ public void testNonCancellationPropagating_successful() throws Exception { } } +public void testSubmitAsync_asyncCallable_error() throws Exception { + com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest(); + testCase.setUp(); + Throwable failure = null; + try { + testCase.testSubmitAsync_asyncCallable_error(); + } catch (Throwable t) { + failure = t; + } + try { + testCase.tearDown(); + } catch (Throwable t) { + if (failure == null) { + failure = t; + } + } + if (failure instanceof Exception) { + throw (Exception) failure; + } + if (failure instanceof Error) { + throw (Error) failure; + } + if (failure != null) { + throw new RuntimeException(failure); + } +} + +public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception { + com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest(); + testCase.setUp(); + Throwable failure = null; + try { + testCase.testSubmitAsync_asyncCallable_nullInsteadOfFuture(); + } catch (Throwable t) { + failure = t; + } + try { + testCase.tearDown(); + } catch (Throwable t) { + if (failure == null) { + failure = t; + } + } + if (failure instanceof Exception) { + throw (Exception) failure; + } + if (failure instanceof Error) { + throw (Error) failure; + } + if (failure != null) { + throw new RuntimeException(failure); + } +} + public void testSuccessfulAsList() throws Exception { com.google.common.util.concurrent.FuturesTest testCase = new com.google.common.util.concurrent.FuturesTest(); testCase.setUp(); diff --git a/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java b/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java index 812fc71f4626..2539212ad41d 100644 --- a/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java +++ b/guava-tests/test/com/google/common/util/concurrent/FuturesTest.java @@ -38,6 +38,8 @@ import static com.google.common.util.concurrent.Futures.lazyTransform; import static com.google.common.util.concurrent.Futures.makeChecked; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; +import static com.google.common.util.concurrent.Futures.scheduleAsync; +import static com.google.common.util.concurrent.Futures.submitAsync; import static com.google.common.util.concurrent.Futures.successfulAsList; import static com.google.common.util.concurrent.Futures.transform; import static com.google.common.util.concurrent.Futures.transformAsync; @@ -51,6 +53,7 @@ import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -79,6 +82,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1727,15 +1731,13 @@ public ListenableFuture apply(String input) throws Exception { // Pause the executor. final CountDownLatch beforeFunction = new CountDownLatch(1); - @SuppressWarnings("unused") // go/futurereturn-lsc - Future possiblyIgnoredError = - executor.submit( - new Runnable() { - @Override - public void run() { - awaitUninterruptibly(beforeFunction); - } - }); + executor.execute( + new Runnable() { + @Override + public void run() { + awaitUninterruptibly(beforeFunction); + } + }); // Cancel the future after making input available. inputFuture.set("value"); @@ -1743,11 +1745,233 @@ public void run() { // Unpause the executor. beforeFunction.countDown(); - executor.awaitTermination(5, SECONDS); + executor.shutdown(); + assertTrue(executor.awaitTermination(5, SECONDS)); assertFalse(functionCalled.get()); } + public void testSubmitAsync_asyncCallable_error() throws InterruptedException { + final Error error = new Error("deliberate"); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + throw error; + } + }; + SettableFuture inputFuture = SettableFuture.create(); + ListenableFuture outputFuture = submitAsync(callable, directExecutor()); + inputFuture.set("value"); + try { + getDone(outputFuture); + fail(); + } catch (ExecutionException expected) { + assertSame(error, expected.getCause()); + } + } + + public void testSubmitAsync_asyncCallable_nullInsteadOfFuture() throws Exception { + ListenableFuture chainedFuture = submitAsync(constantAsyncCallable(null), directExecutor()); + try { + getDone(chainedFuture); + fail(); + } catch (ExecutionException expected) { + NullPointerException cause = (NullPointerException) expected.getCause(); + assertThat(cause) + .hasMessage( + "AsyncCallable.call returned null instead of a Future. " + + "Did you mean to return immediateFuture(null)?"); + } + } + + @GwtIncompatible // threads + + public void testSubmitAsync_asyncCallable_cancelledWhileApplyingFunction() + throws InterruptedException, ExecutionException { + final CountDownLatch inFunction = new CountDownLatch(1); + final CountDownLatch callableDone = new CountDownLatch(1); + final SettableFuture resultFuture = SettableFuture.create(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() throws InterruptedException { + inFunction.countDown(); + callableDone.await(); + return resultFuture; + } + }; + SettableFuture inputFuture = SettableFuture.create(); + ListenableFuture future = submitAsync(callable, newSingleThreadExecutor()); + inputFuture.set("value"); + inFunction.await(); + future.cancel(false); + callableDone.countDown(); + try { + future.get(); + fail(); + } catch (CancellationException expected) { + } + try { + resultFuture.get(); + fail(); + } catch (CancellationException expected) { + } + } + + @GwtIncompatible // threads + + public void testSubmitAsync_asyncCallable_cancelledBeforeApplyingFunction() + throws InterruptedException { + final AtomicBoolean callableCalled = new AtomicBoolean(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + callableCalled.set(true); + return immediateFuture(1); + } + }; + ExecutorService executor = newSingleThreadExecutor(); + // Pause the executor. + final CountDownLatch beforeFunction = new CountDownLatch(1); + executor.execute( + new Runnable() { + @Override + public void run() { + awaitUninterruptibly(beforeFunction); + } + }); + ListenableFuture future = submitAsync(callable, executor); + future.cancel(false); + + // Unpause the executor. + beforeFunction.countDown(); + executor.shutdown(); + assertTrue(executor.awaitTermination(5, SECONDS)); + + assertFalse(callableCalled.get()); + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_error() throws InterruptedException { + final Error error = new Error("deliberate"); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + throw error; + } + }; + SettableFuture inputFuture = SettableFuture.create(); + ListenableFuture outputFuture = submitAsync(callable, directExecutor()); + inputFuture.set("value"); + try { + getDone(outputFuture); + fail(); + } catch (ExecutionException expected) { + assertSame(error, expected.getCause()); + } + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_nullInsteadOfFuture() throws Exception { + ListenableFuture chainedFuture = + scheduleAsync( + constantAsyncCallable(null), + 1, + TimeUnit.NANOSECONDS, + newSingleThreadScheduledExecutor()); + try { + chainedFuture.get(); + fail(); + } catch (ExecutionException expected) { + NullPointerException cause = (NullPointerException) expected.getCause(); + assertThat(cause) + .hasMessage( + "AsyncCallable.call returned null instead of a Future. " + + "Did you mean to return immediateFuture(null)?"); + } + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_cancelledWhileApplyingFunction() + throws InterruptedException, ExecutionException { + final CountDownLatch inFunction = new CountDownLatch(1); + final CountDownLatch callableDone = new CountDownLatch(1); + final SettableFuture resultFuture = SettableFuture.create(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() throws InterruptedException { + inFunction.countDown(); + callableDone.await(); + return resultFuture; + } + }; + ListenableFuture future = + scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, newSingleThreadScheduledExecutor()); + inFunction.await(); + future.cancel(false); + callableDone.countDown(); + try { + future.get(); + fail(); + } catch (CancellationException expected) { + } + try { + resultFuture.get(); + fail(); + } catch (CancellationException expected) { + } + } + + @GwtIncompatible // threads + + public void testScheduleAsync_asyncCallable_cancelledBeforeCallingFunction() + throws InterruptedException { + final AtomicBoolean callableCalled = new AtomicBoolean(); + AsyncCallable callable = + new AsyncCallable() { + @Override + public ListenableFuture call() { + callableCalled.set(true); + return immediateFuture(1); + } + }; + ScheduledExecutorService executor = newSingleThreadScheduledExecutor(); + // Pause the executor. + final CountDownLatch beforeFunction = new CountDownLatch(1); + executor.execute( + new Runnable() { + @Override + public void run() { + awaitUninterruptibly(beforeFunction); + } + }); + ListenableFuture future = scheduleAsync(callable, 1, TimeUnit.NANOSECONDS, executor); + future.cancel(false); + + // Unpause the executor. + beforeFunction.countDown(); + executor.shutdown(); + assertTrue(executor.awaitTermination(5, SECONDS)); + + assertFalse(callableCalled.get()); + } + + private static AsyncCallable constantAsyncCallable(final ListenableFuture returnValue) { + return new AsyncCallable() { + @Override + public ListenableFuture call() { + return returnValue; + } + }; + } + public void testDereference_genericsWildcard() throws Exception { ListenableFuture inner = immediateFuture(null); ListenableFuture> outer = diff --git a/guava/src/com/google/common/util/concurrent/Futures.java b/guava/src/com/google/common/util/concurrent/Futures.java index 2001b897d66e..42194ea54ff5 100644 --- a/guava/src/com/google/common/util/concurrent/Futures.java +++ b/guava/src/com/google/common/util/concurrent/Futures.java @@ -240,6 +240,44 @@ public static CheckedFuture immediateFailedChecke return new ImmediateFailedCheckedFuture(exception); } + /** + * Executes {@code callable} on the specified {@code executor}, returning a {@code Future}. + * + * @throws RejectedExecutionException if the task cannot be scheduled for execution + * @since 23.0 + */ + public static ListenableFuture submitAsync(AsyncCallable callable, Executor executor) { + TrustedListenableFutureTask task = TrustedListenableFutureTask.create(callable); + executor.execute(task); + return task; + } + + /** + * Schedules {@code callable} on the specified {@code executor}, returning a {@code Future}. + * + * @throws RejectedExecutionException if the task cannot be scheduled for execution + * @since 23.0 + */ + @GwtIncompatible // java.util.concurrent.ScheduledExecutorService + public static ListenableFuture scheduleAsync( + AsyncCallable callable, + long delay, + TimeUnit timeUnit, + ScheduledExecutorService executorService) { + TrustedListenableFutureTask task = TrustedListenableFutureTask.create(callable); + final Future scheduled = executorService.schedule(task, delay, timeUnit); + task.addListener( + new Runnable() { + @Override + public void run() { + // Don't want to interrupt twice + scheduled.cancel(false); + } + }, + directExecutor()); + return task; + } + /** * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the * primary input fails with the given {@code exceptionType}, from the result provided by the diff --git a/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java b/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java index e59f8d480330..d0465a966c8e 100644 --- a/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java +++ b/guava/src/com/google/common/util/concurrent/TrustedListenableFutureTask.java @@ -33,12 +33,10 @@ class TrustedListenableFutureTask extends AbstractFuture.TrustedFuture implements RunnableFuture { - /** - * Creates a {@code ListenableFutureTask} that will upon running, execute the given - * {@code Callable}. - * - * @param callable the callable task - */ + static TrustedListenableFutureTask create(AsyncCallable callable) { + return new TrustedListenableFutureTask(callable); + } + static TrustedListenableFutureTask create(Callable callable) { return new TrustedListenableFutureTask(callable); } @@ -62,15 +60,19 @@ static TrustedListenableFutureTask create(Runnable runnable, @Nullable V * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ - private TrustedFutureInterruptibleTask task; + private InterruptibleTask task; TrustedListenableFutureTask(Callable callable) { this.task = new TrustedFutureInterruptibleTask(callable); } + TrustedListenableFutureTask(AsyncCallable callable) { + this.task = new TrustedFutureInterruptibleAsyncTask(callable); + } + @Override public void run() { - TrustedFutureInterruptibleTask localTask = task; + InterruptibleTask localTask = task; if (localTask != null) { localTask.run(); } @@ -81,7 +83,7 @@ protected void afterDone() { super.afterDone(); if (wasInterrupted()) { - TrustedFutureInterruptibleTask localTask = task; + InterruptibleTask localTask = task; if (localTask != null) { localTask.interruptTask(); } @@ -92,7 +94,7 @@ protected void afterDone() { @Override protected String pendingToString() { - TrustedFutureInterruptibleTask localTask = task; + InterruptibleTask localTask = task; if (localTask != null) { return "task=[" + localTask + "]"; } @@ -129,4 +131,40 @@ public String toString() { return callable.toString(); } } + + @WeakOuter + private final class TrustedFutureInterruptibleAsyncTask extends InterruptibleTask { + private final AsyncCallable callable; + + TrustedFutureInterruptibleAsyncTask(AsyncCallable callable) { + this.callable = checkNotNull(callable); + } + + @Override + void runInterruptibly() { + // Ensure we haven't been cancelled or already run. + if (!isDone()) { + try { + ListenableFuture result = callable.call(); + checkNotNull( + result, + "AsyncCallable.call returned null instead of a Future. " + + "Did you mean to return immediateFuture(null)?"); + setFuture(result); + } catch (Throwable t) { + setException(t); + } + } + } + + @Override + boolean wasInterrupted() { + return TrustedListenableFutureTask.this.wasInterrupted(); + } + + @Override + public String toString() { + return callable.toString(); + } + } }