diff options
Diffstat (limited to 'guava/src/com/google/common/util/concurrent/Futures.java')
-rw-r--r-- | guava/src/com/google/common/util/concurrent/Futures.java | 985 |
1 files changed, 417 insertions, 568 deletions
diff --git a/guava/src/com/google/common/util/concurrent/Futures.java b/guava/src/com/google/common/util/concurrent/Futures.java index aad6b43..dc703c5 100644 --- a/guava/src/com/google/common/util/concurrent/Futures.java +++ b/guava/src/com/google/common/util/concurrent/Futures.java @@ -21,14 +21,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; +import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import com.google.common.annotations.Beta; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -38,27 +39,22 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; import javax.annotation.Nullable; /** * Static utility methods pertaining to the {@link Future} interface. * - * <p>Many of these methods use the {@link ListenableFuture} API; consult the - * Guava User Guide article on <a href= - * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> - * {@code ListenableFuture}</a>. - * * @author Kevin Bourrillion * @author Nishant Thakkar * @author Sven Mawson @@ -75,7 +71,7 @@ public final class Futures { * * <p>The given mapping function will be applied to an * {@link InterruptedException}, a {@link CancellationException}, or an - * {@link ExecutionException}. + * {@link ExecutionException} with the actual cause of the exception. * See {@link Future#get()} for details on the exceptions thrown. * * @since 9.0 (source-compatible since 1.0) @@ -85,151 +81,6 @@ public final class Futures { return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); } - private abstract static class ImmediateFuture<V> - implements ListenableFuture<V> { - - private static final Logger log = - Logger.getLogger(ImmediateFuture.class.getName()); - - @Override - public void addListener(Runnable listener, Executor executor) { - checkNotNull(listener, "Runnable was null."); - checkNotNull(executor, "Executor was null."); - try { - executor.execute(listener); - } catch (RuntimeException e) { - // ListenableFuture's contract is that it will not throw unchecked - // exceptions, so log the bad runnable and/or executor and swallow it. - log.log(Level.SEVERE, "RuntimeException while executing runnable " - + listener + " with executor " + executor, e); - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public abstract V get() throws ExecutionException; - - @Override - public V get(long timeout, TimeUnit unit) throws ExecutionException { - checkNotNull(unit); - return get(); - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - } - - private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { - - @Nullable private final V value; - - ImmediateSuccessfulFuture(@Nullable V value) { - this.value = value; - } - - @Override - public V get() { - return value; - } - } - - private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> - extends ImmediateFuture<V> implements CheckedFuture<V, X> { - - @Nullable private final V value; - - ImmediateSuccessfulCheckedFuture(@Nullable V value) { - this.value = value; - } - - @Override - public V get() { - return value; - } - - @Override - public V checkedGet() { - return value; - } - - @Override - public V checkedGet(long timeout, TimeUnit unit) { - checkNotNull(unit); - return value; - } - } - - private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { - - private final Throwable thrown; - - ImmediateFailedFuture(Throwable thrown) { - this.thrown = thrown; - } - - @Override - public V get() throws ExecutionException { - throw new ExecutionException(thrown); - } - } - - private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { - - private final CancellationException thrown; - - ImmediateCancelledFuture() { - this.thrown = new CancellationException("Immediate cancelled future."); - } - - @Override - public boolean isCancelled() { - return true; - } - - @Override - public V get() { - throw AbstractFuture.cancellationExceptionWithCause( - "Task was cancelled.", thrown); - } - } - - private static class ImmediateFailedCheckedFuture<V, X extends Exception> - extends ImmediateFuture<V> implements CheckedFuture<V, X> { - - private final X thrown; - - ImmediateFailedCheckedFuture(X thrown) { - this.thrown = thrown; - } - - @Override - public V get() throws ExecutionException { - throw new ExecutionException(thrown); - } - - @Override - public V checkedGet() throws X { - throw thrown; - } - - @Override - public V checkedGet(long timeout, TimeUnit unit) throws X { - checkNotNull(unit); - throw thrown; - } - } - /** * Creates a {@code ListenableFuture} which has its value set immediately upon * construction. The getters just return the value. This {@code Future} can't @@ -237,7 +88,9 @@ public final class Futures { * {@code true}. */ public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { - return new ImmediateSuccessfulFuture<V>(value); + SettableFuture<V> future = SettableFuture.create(); + future.set(value); + return future; } /** @@ -250,7 +103,14 @@ public final class Futures { */ public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture(@Nullable V value) { - return new ImmediateSuccessfulCheckedFuture<V, X>(value); + SettableFuture<V> future = SettableFuture.create(); + future.set(value); + return Futures.makeChecked(future, new Function<Exception, X>() { + @Override + public X apply(Exception e) { + throw new AssertionError("impossible"); + } + }); } /** @@ -261,21 +121,15 @@ public final class Futures { * method always returns {@code true}. Calling {@code get()} will immediately * throw the provided {@code Throwable} wrapped in an {@code * ExecutionException}. + * + * @throws Error if the throwable is an {@link Error}. */ public static <V> ListenableFuture<V> immediateFailedFuture( Throwable throwable) { checkNotNull(throwable); - return new ImmediateFailedFuture<V>(throwable); - } - - /** - * Creates a {@code ListenableFuture} which is cancelled immediately upon - * construction, so that {@code isCancelled()} always returns {@code true}. - * - * @since 14.0 - */ - public static <V> ListenableFuture<V> immediateCancelledFuture() { - return new ImmediateCancelledFuture<V>(); + SettableFuture<V> future = SettableFuture.create(); + future.setException(throwable); + return future; } /** @@ -284,224 +138,149 @@ public final class Futures { * * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} * method always returns {@code true}. Calling {@code get()} will immediately - * throw the provided {@code Exception} wrapped in an {@code + * throw the provided {@code Throwable} wrapped in an {@code * ExecutionException}, and calling {@code checkedGet()} will throw the * provided exception itself. + * + * @throws Error if the throwable is an {@link Error}. */ public static <V, X extends Exception> CheckedFuture<V, X> - immediateFailedCheckedFuture(X exception) { + immediateFailedCheckedFuture(final X exception) { checkNotNull(exception); - return new ImmediateFailedCheckedFuture<V, X>(exception); + return makeChecked(Futures.<V>immediateFailedFuture(exception), + new Function<Exception, X>() { + @Override + public X apply(Exception e) { + return exception; + } + }); } /** - * Returns a {@code Future} whose result is taken from the given primary - * {@code input} or, if the primary input fails, from the {@code Future} - * provided by the {@code fallback}. {@link FutureFallback#create} is not - * invoked until the primary input has failed, so if the primary input - * succeeds, it is never invoked. If, during the invocation of {@code - * fallback}, an exception is thrown, this exception is used as the result of - * the output {@code Future}. - * - * <p>Below is an example of a fallback that returns a default value if an - * exception occurs: - * - * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter in case an exception happens when - * // processing the RPC to fetch counters. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * // Returning "0" as the default for the counter when the - * // exception happens. - * return immediateFuture(0); - * } - * }); - * }</pre> - * - * The fallback can also choose to propagate the original exception when - * desired: + * <p>Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code Function} to the result of the original {@code + * Future}. Example: * * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter only in case the exception was a - * // TimeoutException. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * if (t instanceof TimeoutException) { - * return immediateFuture(0); - * } - * return immediateFailedFuture(t); + * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); + * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = + * new Function<RowKey, ListenableFuture<QueryResult>>() { + * public ListenableFuture<QueryResult> apply(RowKey rowKey) { + * return dataService.read(rowKey); * } - * }); + * }; + * ListenableFuture<QueryResult> queryFuture = + * chain(rowKeyFuture, queryFunction); * }</pre> * - * Note: If the derived {@code Future} is slow or heavyweight to create - * (whether the {@code Future} itself is slow or heavyweight to complete is - * irrelevant), consider {@linkplain #withFallback(ListenableFuture, - * FutureFallback, Executor) supplying an executor}. If you do not supply an - * executor, {@code withFallback} will use {@link - * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some - * caveats for heavier operations. For example, the call to {@code - * fallback.create} may run on an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code withFallback} - * is called, {@code withFallback} will call {@code fallback.create} inline. - * <li>If the input {@code Future} is not yet done, {@code withFallback} will - * schedule {@code fallback.create} to be run by the thread that completes - * the input {@code Future}, which may be an internal system thread such as - * an RPC network thread. - * </ul> + * <p>Note: This overload of {@code chain} is designed for cases in which the + * work of creating the derived future is fast and lightweight, as the method + * does not accept an {@code Executor} in which to perform the the work. For + * heavier derivations, this overload carries some caveats: First, the thread + * that the derivation runs in depends on whether the input {@code Future} is + * done at the time {@code chain} is called. In particular, if called late, + * {@code chain} will run the derivation in the thread that called {@code + * chain}. Second, derivations may run in an internal thread of the system + * responsible for the input {@code Future}, such as an RPC network thread. + * Finally, during the execution of a {@code sameThreadExecutor} {@code + * chain} function, all other registered but unexecuted listeners are + * prevented from running, even if those listeners are to run in other + * executors. * - * Also note that, regardless of which thread executes {@code - * fallback.create}, all other registered but unexecuted listeners are - * prevented from running during its execution, even if those listeners are - * to run in other executors. + * <p>The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * chain function. That is, if the returned {@code Future} is cancelled, it + * will attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. * - * @param input the primary input {@code Future} - * @param fallback the {@link FutureFallback} implementation to be called if - * {@code input} fails - * @since 14.0 + * @param input The future to chain + * @param function A function to chain the results of the provided future + * to the results of the returned future. This will be run in the thread + * that notifies input it is complete. + * @return A future that holds result of the chain. + * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and + * use {@link #transform(ListenableFuture, AsyncFunction)}. This method is + * scheduled to be removed from Guava in Guava release 12.0. */ - public static <V> ListenableFuture<V> withFallback( - ListenableFuture<? extends V> input, - FutureFallback<? extends V> fallback) { - return withFallback(input, fallback, sameThreadExecutor()); + @Deprecated + public static <I, O> ListenableFuture<O> chain( + ListenableFuture<I> input, + Function<? super I, ? extends ListenableFuture<? extends O>> function) { + return chain(input, function, MoreExecutors.sameThreadExecutor()); } /** - * Returns a {@code Future} whose result is taken from the given primary - * {@code input} or, if the primary input fails, from the {@code Future} - * provided by the {@code fallback}. {@link FutureFallback#create} is not - * invoked until the primary input has failed, so if the primary input - * succeeds, it is never invoked. If, during the invocation of {@code - * fallback}, an exception is thrown, this exception is used as the result of - * the output {@code Future}. - * - * <p>Below is an example of a fallback that returns a default value if an - * exception occurs: + * <p>Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code Function} to the result of the original {@code + * Future}. Example: * * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter in case an exception happens when - * // processing the RPC to fetch counters. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * // Returning "0" as the default for the counter when the - * // exception happens. - * return immediateFuture(0); + * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); + * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = + * new Function<RowKey, ListenableFuture<QueryResult>>() { + * public ListenableFuture<QueryResult> apply(RowKey rowKey) { + * return dataService.read(rowKey); * } - * }, sameThreadExecutor()); + * }; + * ListenableFuture<QueryResult> queryFuture = + * chain(rowKeyFuture, queryFunction, executor); * }</pre> * - * The fallback can also choose to propagate the original exception when - * desired: - * - * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter only in case the exception was a - * // TimeoutException. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * if (t instanceof TimeoutException) { - * return immediateFuture(0); - * } - * return immediateFailedFuture(t); - * } - * }, sameThreadExecutor()); - * }</pre> + * <p>The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * chain function. That is, if the returned {@code Future} is cancelled, it + * will attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. * - * When the execution of {@code fallback.create} is fast and lightweight - * (though the {@code Future} it returns need not meet these criteria), - * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) - * omitting the executor} or explicitly specifying {@code - * sameThreadExecutor}. However, be aware of the caveats documented in the - * link above. - * - * @param input the primary input {@code Future} - * @param fallback the {@link FutureFallback} implementation to be called if - * {@code input} fails - * @param executor the executor that runs {@code fallback} if {@code input} - * fails - * @since 14.0 - */ - public static <V> ListenableFuture<V> withFallback( - ListenableFuture<? extends V> input, - FutureFallback<? extends V> fallback, Executor executor) { - checkNotNull(fallback); - return new FallbackFuture<V>(input, fallback, executor); - } - - /** - * A future that falls back on a second, generated future, in case its - * original future fails. + * <p>Note: For cases in which the work of creating the derived future is + * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture, + * Function) the other overload} or explicit use of {@code + * sameThreadExecutor}. For heavier derivations, this choice carries some + * caveats: First, the thread that the derivation runs in depends on whether + * the input {@code Future} is done at the time {@code chain} is called. In + * particular, if called late, {@code chain} will run the derivation in the + * thread that called {@code chain}. Second, derivations may run in an + * internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} {@code chain} function, all other registered but + * unexecuted listeners are prevented from running, even if those listeners + * are to run in other executors. + * + * @param input The future to chain + * @param function A function to chain the results of the provided future + * to the results of the returned future. + * @param executor Executor to run the function in. + * @return A future that holds result of the chain. + * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and + * use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This + * method is scheduled to be removed from Guava in Guava release 12.0. */ - private static class FallbackFuture<V> extends AbstractFuture<V> { - - private volatile ListenableFuture<? extends V> running; - - FallbackFuture(ListenableFuture<? extends V> input, - final FutureFallback<? extends V> fallback, - final Executor executor) { - running = input; - addCallback(running, new FutureCallback<V>() { - @Override - public void onSuccess(V value) { - set(value); - } - - @Override - public void onFailure(Throwable t) { - if (isCancelled()) { - return; - } - try { - running = fallback.create(t); - if (isCancelled()) { // in case cancel called in the meantime - running.cancel(wasInterrupted()); - return; - } - addCallback(running, new FutureCallback<V>() { - @Override - public void onSuccess(V value) { - set(value); - } - - @Override - public void onFailure(Throwable t) { - if (running.isCancelled()) { - cancel(false); - } else { - setException(t); - } - } - }, sameThreadExecutor()); - } catch (Exception e) { - setException(e); - } catch (Error e) { - setException(e); // note: rethrows + @Deprecated + public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, + final Function<? super I, ? extends ListenableFuture<? extends O>> + function, + Executor executor) { + checkNotNull(function); + ChainingListenableFuture<I, O> chain = + new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() { + @Override + /* + * All methods of ListenableFuture are covariant, and we don't expose + * the object anywhere that would allow it to be downcast. + */ + @SuppressWarnings("unchecked") + public ListenableFuture<O> apply(I input) { + return (ListenableFuture) function.apply(input); } - } - }, executor); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (super.cancel(mayInterruptIfRunning)) { - running.cancel(mayInterruptIfRunning); - return true; - } - return false; - } + }, input); + input.addListener(chain, executor); + return chain; } /** @@ -523,28 +302,20 @@ public final class Futures { * transform(rowKeyFuture, queryFunction); * }</pre> * - * Note: If the derived {@code Future} is slow or heavyweight to create - * (whether the {@code Future} itself is slow or heavyweight to complete is - * irrelevant), consider {@linkplain #transform(ListenableFuture, - * AsyncFunction, Executor) supplying an executor}. If you do not supply an - * executor, {@code transform} will use {@link - * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some - * caveats for heavier operations. For example, the call to {@code - * function.apply} may run on an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code transform} is - * called, {@code transform} will call {@code function.apply} inline. - * <li>If the input {@code Future} is not yet done, {@code transform} will - * schedule {@code function.apply} to be run by the thread that completes the - * input {@code Future}, which may be an internal system thread such as an - * RPC network thread. - * </ul> - * - * Also note that, regardless of which thread executes {@code - * function.apply}, all other registered but unexecuted listeners are - * prevented from running during its execution, even if those listeners are - * to run in other executors. + * <p>Note: This overload of {@code transform} is designed for cases in which + * the work of creating the derived {@code Future} is fast and lightweight, + * as the method does not accept an {@code Executor} in which to perform the + * the work. (The created {@code Future} itself need not complete quickly.) + * For heavier operations, this overload carries some caveats: First, the + * thread that {@code function.apply} runs in depends on whether the input + * {@code Future} is done at the time {@code transform} is called. In + * particular, if called late, {@code transform} will run the operation in + * the thread that called {@code transform}. Second, {@code function.apply} + * may run in an internal thread of the system responsible for the input + * {@code Future}, such as an RPC network thread. Finally, during the + * execution of a {@code sameThreadExecutor} {@code function.apply}, all + * other registered but unexecuted listeners are prevented from running, even + * if those listeners are to run in other executors. * * <p>The returned {@code Future} attempts to keep its cancellation state in * sync with that of the input future and that of the future returned by the @@ -591,11 +362,20 @@ public final class Futures { * cancelled, the returned {@code Future} will receive a callback in which it * will attempt to cancel itself. * - * <p>When the execution of {@code function.apply} is fast and lightweight - * (though the {@code Future} it returns need not meet these criteria), - * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting - * the executor} or explicitly specifying {@code sameThreadExecutor}. - * However, be aware of the caveats documented in the link above. + * <p>Note: For cases in which the work of creating the derived future is + * fast and lightweight, consider {@linkplain + * Futures#transform(ListenableFuture, Function) the other overload} or + * explicit use of {@code sameThreadExecutor}. For heavier derivations, this + * choice carries some caveats: First, the thread that {@code function.apply} + * runs in depends on whether the input {@code Future} is done at the time + * {@code transform} is called. In particular, if called late, {@code + * transform} will run the operation in the thread that called {@code + * transform}. Second, {@code function.apply} may run in an internal thread + * of the system responsible for the input {@code Future}, such as an RPC + * network thread. Finally, during the execution of a {@code + * sameThreadExecutor} {@code function.apply}, all other registered but + * unexecuted listeners are prevented from running, even if those listeners + * are to run in other executors. * * @param input The future to transform * @param function A function to transform the result of the input future @@ -631,26 +411,19 @@ public final class Futures { * transform(queryFuture, rowsFunction); * }</pre> * - * Note: If the transformation is slow or heavyweight, consider {@linkplain - * #transform(ListenableFuture, Function, Executor) supplying an executor}. - * If you do not supply an executor, {@code transform} will use {@link - * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some - * caveats for heavier operations. For example, the call to {@code - * function.apply} may run on an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code transform} is - * called, {@code transform} will call {@code function.apply} inline. - * <li>If the input {@code Future} is not yet done, {@code transform} will - * schedule {@code function.apply} to be run by the thread that completes the - * input {@code Future}, which may be an internal system thread such as an - * RPC network thread. - * </ul> - * - * Also note that, regardless of which thread executes {@code - * function.apply}, all other registered but unexecuted listeners are - * prevented from running during its execution, even if those listeners are - * to run in other executors. + * <p>Note: This overload of {@code transform} is designed for cases in which + * the transformation is fast and lightweight, as the method does not accept + * an {@code Executor} in which to perform the the work. For heavier + * transformations, this overload carries some caveats: First, the thread + * that the transformation runs in depends on whether the input {@code + * Future} is done at the time {@code transform} is called. In particular, if + * called late, {@code transform} will perform the transformation in the + * thread that called {@code transform}. Second, transformations may run in + * an internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} transformation, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. * * <p>The returned {@code Future} attempts to keep its cancellation state in * sync with that of the input future. That is, if the returned {@code Future} @@ -661,16 +434,16 @@ public final class Futures { * <p>An example use of this method is to convert a serializable object * returned from an RPC into a POJO. * - * @param input The future to transform + * @param future The future to transform * @param function A Function to transform the results of the provided future * to the results of the returned future. This will be run in the thread * that notifies input it is complete. * @return A future that holds result of the transformation. * @since 9.0 (in 1.0 as {@code compose}) */ - public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, + public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, final Function<? super I, ? extends O> function) { - return transform(input, function, MoreExecutors.sameThreadExecutor()); + return transform(future, function, MoreExecutors.sameThreadExecutor()); } /** @@ -699,29 +472,39 @@ public final class Futures { * <p>An example use of this method is to convert a serializable object * returned from an RPC into a POJO. * - * <p>When the transformation is fast and lightweight, consider {@linkplain - * #transform(ListenableFuture, Function) omitting the executor} or - * explicitly specifying {@code sameThreadExecutor}. However, be aware of the - * caveats documented in the link above. - * - * @param input The future to transform + * <p>Note: For cases in which the transformation is fast and lightweight, + * consider {@linkplain Futures#transform(ListenableFuture, Function) the + * other overload} or explicit use of {@link + * MoreExecutors#sameThreadExecutor}. For heavier transformations, this + * choice carries some caveats: First, the thread that the transformation + * runs in depends on whether the input {@code Future} is done at the time + * {@code transform} is called. In particular, if called late, {@code + * transform} will perform the transformation in the thread that called + * {@code transform}. Second, transformations may run in an internal thread + * of the system responsible for the input {@code Future}, such as an RPC + * network thread. Finally, during the execution of a {@code + * sameThreadExecutor} transformation, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. + * + * @param future The future to transform * @param function A Function to transform the results of the provided future * to the results of the returned future. * @param executor Executor to run the function in. * @return A future that holds result of the transformation. * @since 9.0 (in 2.0 as {@code compose}) */ - public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, + public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, final Function<? super I, ? extends O> function, Executor executor) { checkNotNull(function); - AsyncFunction<I, O> wrapperFunction - = new AsyncFunction<I, O>() { + Function<I, ListenableFuture<O>> wrapperFunction + = new Function<I, ListenableFuture<O>>() { @Override public ListenableFuture<O> apply(I input) { O output = function.apply(input); return immediateFuture(output); } }; - return transform(input, wrapperFunction, executor); + return chain(future, wrapperFunction, executor); } /** @@ -741,43 +524,43 @@ public final class Futures { * who don't have a {@code ListenableFuture} available and * do not mind repeated, lazy function evaluation. * - * @param input The future to transform + * @param future The future to transform * @param function A Function to transform the results of the provided future * to the results of the returned future. * @return A future that returns the result of the transformation. * @since 10.0 */ @Beta - public static <I, O> Future<O> lazyTransform(final Future<I> input, + public static <I, O> Future<O> lazyTransform(final Future<I> future, final Function<? super I, ? extends O> function) { - checkNotNull(input); + checkNotNull(future); checkNotNull(function); return new Future<O>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { - return input.cancel(mayInterruptIfRunning); + return future.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { - return input.isCancelled(); + return future.isCancelled(); } @Override public boolean isDone() { - return input.isDone(); + return future.isDone(); } @Override public O get() throws InterruptedException, ExecutionException { - return applyTransformation(input.get()); + return applyTransformation(future.get()); } @Override public O get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return applyTransformation(input.get(timeout, unit)); + return applyTransformation(future.get(timeout, unit)); } private O applyTransformation(I input) throws ExecutionException { @@ -806,6 +589,8 @@ public final class Futures { private AsyncFunction<? super I, ? extends O> function; private ListenableFuture<? extends I> inputFuture; private volatile ListenableFuture<? extends O> outputFuture; + private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = + new LinkedBlockingQueue<Boolean>(1); private final CountDownLatch outputCreated = new CountDownLatch(1); private ChainingListenableFuture( @@ -815,6 +600,90 @@ public final class Futures { this.inputFuture = checkNotNull(inputFuture); } + /** + * Delegate the get() to the input and output futures, in case + * their implementations defer starting computation until their + * own get() is invoked. + */ + @Override + public O get() throws InterruptedException, ExecutionException { + if (!isDone()) { + // Invoking get on the inputFuture will ensure our own run() + // method below is invoked as a listener when inputFuture sets + // its value. Therefore when get() returns we should then see + // the outputFuture be created. + ListenableFuture<? extends I> inputFuture = this.inputFuture; + if (inputFuture != null) { + inputFuture.get(); + } + + // If our listener was scheduled to run on an executor we may + // need to wait for our listener to finish running before the + // outputFuture has been constructed by the function. + outputCreated.await(); + + // Like above with the inputFuture, we have a listener on + // the outputFuture that will set our own value when its + // value is set. Invoking get will ensure the output can + // complete and invoke our listener, so that we can later + // get the result. + ListenableFuture<? extends O> outputFuture = this.outputFuture; + if (outputFuture != null) { + outputFuture.get(); + } + } + return super.get(); + } + + /** + * Delegate the get() to the input and output futures, in case + * their implementations defer starting computation until their + * own get() is invoked. + */ + @Override + public O get(long timeout, TimeUnit unit) throws TimeoutException, + ExecutionException, InterruptedException { + if (!isDone()) { + // Use a single time unit so we can decrease remaining timeout + // as we wait for various phases to complete. + if (unit != NANOSECONDS) { + timeout = NANOSECONDS.convert(timeout, unit); + unit = NANOSECONDS; + } + + // Invoking get on the inputFuture will ensure our own run() + // method below is invoked as a listener when inputFuture sets + // its value. Therefore when get() returns we should then see + // the outputFuture be created. + ListenableFuture<? extends I> inputFuture = this.inputFuture; + if (inputFuture != null) { + long start = System.nanoTime(); + inputFuture.get(timeout, unit); + timeout -= Math.max(0, System.nanoTime() - start); + } + + // If our listener was scheduled to run on an executor we may + // need to wait for our listener to finish running before the + // outputFuture has been constructed by the function. + long start = System.nanoTime(); + if (!outputCreated.await(timeout, unit)) { + throw new TimeoutException(); + } + timeout -= Math.max(0, System.nanoTime() - start); + + // Like above with the inputFuture, we have a listener on + // the outputFuture that will set our own value when its + // value is set. Invoking get will ensure the output can + // complete and invoke our listener, so that we can later + // get the result. + ListenableFuture<? extends O> outputFuture = this.outputFuture; + if (outputFuture != null) { + outputFuture.get(timeout, unit); + } + } + return super.get(timeout, unit); + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { /* @@ -824,6 +693,7 @@ public final class Futures { if (super.cancel(mayInterruptIfRunning)) { // This should never block since only one thread is allowed to cancel // this Future. + putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning); cancel(inputFuture, mayInterruptIfRunning); cancel(outputFuture, mayInterruptIfRunning); return true; @@ -859,7 +729,13 @@ public final class Futures { final ListenableFuture<? extends O> outputFuture = this.outputFuture = function.apply(sourceResult); if (isCancelled()) { - outputFuture.cancel(wasInterrupted()); + // Handles the case where cancel was called while the function was + // being applied. + // There is a gap in cancel(boolean) between calling sync.cancel() + // and storing the value of mayInterruptIfRunning, so this thread + // needs to block, waiting for that value. + outputFuture.cancel( + takeUninterruptibly(mayInterruptIfRunningChannel)); this.outputFuture = null; return; } @@ -907,52 +783,14 @@ public final class Futures { } /** - * Returns a new {@code ListenableFuture} whose result is the product of - * calling {@code get()} on the {@code Future} nested within the given {@code - * Future}, effectively chaining the futures one after the other. Example: - * - * <pre> {@code - * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); - * ListenableFuture<String> dereferenced = dereference(nested); - * }</pre> - * - * <p>This call has the same cancellation and execution semantics as {@link - * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code - * Future} attempts to keep its cancellation state in sync with both the - * input {@code Future} and the nested {@code Future}. The transformation - * is very lightweight and therefore takes place in the thread that called - * {@code dereference}. - * - * @param nested The nested future to transform. - * @return A future that holds result of the inner future. - * @since 13.0 - */ - @Beta - @SuppressWarnings({"rawtypes", "unchecked"}) - public static <V> ListenableFuture<V> dereference( - ListenableFuture<? extends ListenableFuture<? extends V>> nested) { - return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); - } - - /** - * Helper {@code Function} for {@link #dereference}. - */ - private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = - new AsyncFunction<ListenableFuture<Object>, Object>() { - @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { - return input; - } - }; - - /** * Creates a new {@code ListenableFuture} whose value is a list containing the * values of all its input futures, if all succeed. If any input fails, the * returned future fails. * * <p>The list of results is in the same order as the input list. * - * <p>Canceling this future will attempt to cancel all the component futures, - * and if any of the provided futures fails or is canceled, this one is, + * <p>Canceling this future does not cancel any of the component futures; + * however, if any of the provided futures fails or is canceled, this one is, * too. * * @param futures futures to combine @@ -963,7 +801,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> allAsList( ListenableFuture<? extends V>... futures) { - return listFuture(ImmutableList.copyOf(futures), true, + return new ListFuture<V>(ImmutableList.copyOf(futures), true, MoreExecutors.sameThreadExecutor()); } @@ -974,8 +812,8 @@ public final class Futures { * * <p>The list of results is in the same order as the input list. * - * <p>Canceling this future will attempt to cancel all the component futures, - * and if any of the provided futures fails or is canceled, this one is, + * <p>Canceling this future does not cancel any of the component futures; + * however, if any of the provided futures fails or is canceled, this one is, * too. * * @param futures futures to combine @@ -986,7 +824,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> allAsList( Iterable<? extends ListenableFuture<? extends V>> futures) { - return listFuture(ImmutableList.copyOf(futures), true, + return new ListFuture<V>(ImmutableList.copyOf(futures), true, MoreExecutors.sameThreadExecutor()); } @@ -998,8 +836,6 @@ public final class Futures { * indistinguishable from the future having a successful value of * {@code null}). * - * <p>Canceling this future will attempt to cancel all the component futures. - * * @param futures futures to combine * @return a future that provides a list of the results of the component * futures @@ -1008,7 +844,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> successfulAsList( ListenableFuture<? extends V>... futures) { - return listFuture(ImmutableList.copyOf(futures), false, + return new ListFuture<V>(ImmutableList.copyOf(futures), false, MoreExecutors.sameThreadExecutor()); } @@ -1020,8 +856,6 @@ public final class Futures { * indistinguishable from the future having a successful value of * {@code null}). * - * <p>Canceling this future will attempt to cancel all the component futures. - * * @param futures futures to combine * @return a future that provides a list of the results of the component * futures @@ -1030,7 +864,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> successfulAsList( Iterable<? extends ListenableFuture<? extends V>> futures) { - return listFuture(ImmutableList.copyOf(futures), false, + return new ListFuture<V>(ImmutableList.copyOf(futures), false, MoreExecutors.sameThreadExecutor()); } @@ -1055,26 +889,19 @@ public final class Futures { * } * });}</pre> * - * Note: If the callback is slow or heavyweight, consider {@linkplain - * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an - * executor}. If you do not supply an executor, {@code addCallback} will use - * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries - * some caveats for heavier operations. For example, the callback may run on - * an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code addCallback} is - * called, {@code addCallback} will execute the callback inline. - * <li>If the input {@code Future} is not yet done, {@code addCallback} will - * schedule the callback to be run by the thread that completes the input - * {@code Future}, which may be an internal system thread such as an RPC - * network thread. - * </ul> - * - * Also note that, regardless of which thread executes the callback, all - * other registered but unexecuted listeners are prevented from running - * during its execution, even if those listeners are to run in other - * executors. + * <p>Note: This overload of {@code addCallback} is designed for cases in + * which the callack is fast and lightweight, as the method does not accept + * an {@code Executor} in which to perform the the work. For heavier + * callbacks, this overload carries some caveats: First, the thread that the + * callback runs in depends on whether the input {@code Future} is done at the + * time {@code addCallback} is called and on whether the input {@code Future} + * is ever cancelled. In particular, {@code addCallback} may execute the + * callback in the thread that calls {@code addCallback} or {@code + * Future.cancel}. Second, callbacks may run in an internal thread of the + * system responsible for the input {@code Future}, such as an RPC network + * thread. Finally, during the execution of a {@code sameThreadExecutor} + * callback, all other registered but unexecuted listeners are prevented from + * running, even if those listeners are to run in other executors. * * <p>For a more general interface to attach a completion listener to a * {@code Future}, see {@link ListenableFuture#addListener addListener}. @@ -1111,10 +938,20 @@ public final class Futures { * } * });}</pre> * - * When the callback is fast and lightweight, consider {@linkplain - * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or - * explicitly specifying {@code sameThreadExecutor}. However, be aware of the - * caveats documented in the link above. + * When the callback is fast and lightweight consider {@linkplain + * Futures#addCallback(ListenableFuture, FutureCallback) the other overload} + * or explicit use of {@link MoreExecutors#sameThreadExecutor + * sameThreadExecutor}. For heavier callbacks, this choice carries some + * caveats: First, the thread that the callback runs in depends on whether + * the input {@code Future} is done at the time {@code addCallback} is called + * and on whether the input {@code Future} is ever cancelled. In particular, + * {@code addCallback} may execute the callback in the thread that calls + * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in + * an internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} callback, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. * * <p>For a more general interface to attach a completion listener to a * {@code Future}, see {@link ListenableFuture#addListener addListener}. @@ -1131,22 +968,18 @@ public final class Futures { Runnable callbackListener = new Runnable() { @Override public void run() { - final V value; try { // TODO(user): (Before Guava release), validate that this // is the thing for IE. - value = getUninterruptibly(future); + V value = getUninterruptibly(future); + callback.onSuccess(value); } catch (ExecutionException e) { callback.onFailure(e.getCause()); - return; } catch (RuntimeException e) { callback.onFailure(e); - return; } catch (Error e) { callback.onFailure(e); - return; } - callback.onSuccess(value); } }; future.addListener(callbackListener, executor); @@ -1434,53 +1267,49 @@ public final class Futures { } } - private interface FutureCombiner<V, C> { - C combine(List<Optional<V>> values); - } - - private static class CombinedFuture<V, C> extends AbstractFuture<C> { - ImmutableCollection<? extends ListenableFuture<? extends V>> futures; + /** + * Class that implements {@link #allAsList} and {@link #successfulAsList}. + * The idea is to create a (null-filled) List and register a listener with + * each component future to fill out the value in the List when that future + * completes. + */ + private static class ListFuture<V> extends AbstractFuture<List<V>> { + ImmutableList<? extends ListenableFuture<? extends V>> futures; final boolean allMustSucceed; final AtomicInteger remaining; - FutureCombiner<V, C> combiner; - List<Optional<V>> values; + List<V> values; - CombinedFuture( - ImmutableCollection<? extends ListenableFuture<? extends V>> futures, - boolean allMustSucceed, Executor listenerExecutor, - FutureCombiner<V, C> combiner) { + /** + * Constructor. + * + * @param futures all the futures to build the list from + * @param allMustSucceed whether a single failure or cancellation should + * propagate to this future + * @param listenerExecutor used to run listeners on all the passed in + * futures. + */ + ListFuture( + final ImmutableList<? extends ListenableFuture<? extends V>> futures, + final boolean allMustSucceed, final Executor listenerExecutor) { this.futures = futures; + this.values = Lists.newArrayListWithCapacity(futures.size()); this.allMustSucceed = allMustSucceed; this.remaining = new AtomicInteger(futures.size()); - this.combiner = combiner; - this.values = Lists.newArrayListWithCapacity(futures.size()); + init(listenerExecutor); } - /** - * Must be called at the end of the constructor. - */ - protected void init(final Executor listenerExecutor) { + private void init(final Executor listenerExecutor) { // First, schedule cleanup to execute when the Future is done. addListener(new Runnable() { @Override public void run() { - // Cancel all the component futures. - if (CombinedFuture.this.isCancelled()) { - for (ListenableFuture<?> future : CombinedFuture.this.futures) { - future.cancel(CombinedFuture.this.wasInterrupted()); - } - } - // By now the values array has either been set as the Future's value, // or (in case of failure) is no longer useful. - CombinedFuture.this.futures = null; + ListFuture.this.values = null; // Let go of the memory held by other futures - CombinedFuture.this.values = null; - - // The combiner may also hold state, so free that as well - CombinedFuture.this.combiner = null; + ListFuture.this.futures = null; } }, MoreExecutors.sameThreadExecutor()); @@ -1488,7 +1317,7 @@ public final class Futures { // Corner case: List is empty. if (futures.isEmpty()) { - set(combiner.combine(ImmutableList.<Optional<V>>of())); + set(Lists.newArrayList(values)); return; } @@ -1503,11 +1332,11 @@ public final class Futures { // this loop, the last call to addListener() will callback to // setOneValue(), transitively call our cleanup listener, and set // this.futures to null. - // This is not actually a problem, since the foreach only needs - // this.futures to be non-null at the beginning of the loop. - int i = 0; - for (final ListenableFuture<? extends V> listenable : futures) { - final int index = i++; + // We store a reference to futures to avoid the NPE. + ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures; + for (int i = 0; i < localFutures.size(); i++) { + final ListenableFuture<? extends V> listenable = localFutures.get(i); + final int index = i; listenable.addListener(new Runnable() { @Override public void run() { @@ -1521,12 +1350,12 @@ public final class Futures { * Sets the value at the given index to that of the given future. */ private void setOneValue(int index, Future<? extends V> future) { - List<Optional<V>> localValues = values; + List<V> localValues = values; if (isDone() || localValues == null) { // Some other future failed or has been cancelled, causing this one to // also be cancelled or have an exception set. This should only happen - // if allMustSucceed is true or if the output itself has been cancelled. - checkState(allMustSucceed || isCancelled(), + // if allMustSucceed is true. + checkState(allMustSucceed, "Future was done before all dependencies completed"); return; } @@ -1534,8 +1363,7 @@ public final class Futures { try { checkState(future.isDone(), "Tried to set value from future which is not done"); - V returnValue = getUninterruptibly(future); - localValues.set(index, Optional.fromNullable(returnValue)); + localValues.set(index, getUninterruptibly(future)); } catch (CancellationException e) { if (allMustSucceed) { // Set ourselves as cancelled. Let the input futures keep running @@ -1561,9 +1389,9 @@ public final class Futures { int newRemaining = remaining.decrementAndGet(); checkState(newRemaining >= 0, "Less than 0 remaining futures"); if (newRemaining == 0) { - FutureCombiner<V, C> localCombiner = combiner; - if (localCombiner != null) { - set(localCombiner.combine(localValues)); + localValues = values; + if (localValues != null) { + set(Lists.newArrayList(localValues)); } else { checkState(isDone()); } @@ -1571,25 +1399,46 @@ public final class Futures { } } - } + @Override + public List<V> get() throws InterruptedException, ExecutionException { + callAllGets(); - /** Used for {@link #allAsList} and {@link #successfulAsList}. */ - private static <V> ListenableFuture<List<V>> listFuture( - ImmutableList<ListenableFuture<? extends V>> futures, - boolean allMustSucceed, Executor listenerExecutor) { - return new CombinedFuture<V, List<V>>( - futures, allMustSucceed, listenerExecutor, - new FutureCombiner<V, List<V>>() { - @Override - public List<V> combine(List<Optional<V>> values) { - List<V> result = Lists.newArrayList(); - for (Optional<V> element : values) { - result.add(element != null ? element.orNull() : null); + // This may still block in spite of the calls above, as the listeners may + // be scheduled for execution in other threads. + return super.get(); + } + + /** + * Calls the get method of all dependency futures to work around a bug in + * some ListenableFutures where the listeners aren't called until get() is + * called. + */ + private void callAllGets() throws InterruptedException { + List<? extends ListenableFuture<? extends V>> oldFutures = futures; + if (oldFutures != null && !isDone()) { + for (ListenableFuture<? extends V> future : oldFutures) { + // We wait for a little while for the future, but if it's not done, + // we check that no other futures caused a cancellation or failure. + // This can introduce a delay of up to 10ms in reporting an exception. + while (!future.isDone()) { + try { + future.get(); + } catch (Error e) { + throw e; + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + // ExecutionException / CancellationException / RuntimeException + if (allMustSucceed) { + return; + } else { + continue; + } } - // TODO(user): This should ultimately return an unmodifiableList - return result; } - }); + } + } + } } /** |