aboutsummaryrefslogtreecommitdiffstats
path: root/guava/src/com/google/common/util/concurrent/Futures.java
diff options
context:
space:
mode:
Diffstat (limited to 'guava/src/com/google/common/util/concurrent/Futures.java')
-rw-r--r--guava/src/com/google/common/util/concurrent/Futures.java985
1 files changed, 417 insertions, 568 deletions
diff --git a/guava/src/com/google/common/util/concurrent/Futures.java b/guava/src/com/google/common/util/concurrent/Futures.java
index aad6b43..dc703c5 100644
--- a/guava/src/com/google/common/util/concurrent/Futures.java
+++ b/guava/src/com/google/common/util/concurrent/Futures.java
@@ -21,14 +21,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
+import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
+import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
import static java.lang.Thread.currentThread;
import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@@ -38,27 +39,22 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Static utility methods pertaining to the {@link Future} interface.
*
- * <p>Many of these methods use the {@link ListenableFuture} API; consult the
- * Guava User Guide article on <a href=
- * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
- * {@code ListenableFuture}</a>.
- *
* @author Kevin Bourrillion
* @author Nishant Thakkar
* @author Sven Mawson
@@ -75,7 +71,7 @@ public final class Futures {
*
* <p>The given mapping function will be applied to an
* {@link InterruptedException}, a {@link CancellationException}, or an
- * {@link ExecutionException}.
+ * {@link ExecutionException} with the actual cause of the exception.
* See {@link Future#get()} for details on the exceptions thrown.
*
* @since 9.0 (source-compatible since 1.0)
@@ -85,151 +81,6 @@ public final class Futures {
return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
}
- private abstract static class ImmediateFuture<V>
- implements ListenableFuture<V> {
-
- private static final Logger log =
- Logger.getLogger(ImmediateFuture.class.getName());
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- checkNotNull(listener, "Runnable was null.");
- checkNotNull(executor, "Executor was null.");
- try {
- executor.execute(listener);
- } catch (RuntimeException e) {
- // ListenableFuture's contract is that it will not throw unchecked
- // exceptions, so log the bad runnable and/or executor and swallow it.
- log.log(Level.SEVERE, "RuntimeException while executing runnable "
- + listener + " with executor " + executor, e);
- }
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public abstract V get() throws ExecutionException;
-
- @Override
- public V get(long timeout, TimeUnit unit) throws ExecutionException {
- checkNotNull(unit);
- return get();
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
- }
-
- private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
-
- @Nullable private final V value;
-
- ImmediateSuccessfulFuture(@Nullable V value) {
- this.value = value;
- }
-
- @Override
- public V get() {
- return value;
- }
- }
-
- private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
- extends ImmediateFuture<V> implements CheckedFuture<V, X> {
-
- @Nullable private final V value;
-
- ImmediateSuccessfulCheckedFuture(@Nullable V value) {
- this.value = value;
- }
-
- @Override
- public V get() {
- return value;
- }
-
- @Override
- public V checkedGet() {
- return value;
- }
-
- @Override
- public V checkedGet(long timeout, TimeUnit unit) {
- checkNotNull(unit);
- return value;
- }
- }
-
- private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
-
- private final Throwable thrown;
-
- ImmediateFailedFuture(Throwable thrown) {
- this.thrown = thrown;
- }
-
- @Override
- public V get() throws ExecutionException {
- throw new ExecutionException(thrown);
- }
- }
-
- private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
-
- private final CancellationException thrown;
-
- ImmediateCancelledFuture() {
- this.thrown = new CancellationException("Immediate cancelled future.");
- }
-
- @Override
- public boolean isCancelled() {
- return true;
- }
-
- @Override
- public V get() {
- throw AbstractFuture.cancellationExceptionWithCause(
- "Task was cancelled.", thrown);
- }
- }
-
- private static class ImmediateFailedCheckedFuture<V, X extends Exception>
- extends ImmediateFuture<V> implements CheckedFuture<V, X> {
-
- private final X thrown;
-
- ImmediateFailedCheckedFuture(X thrown) {
- this.thrown = thrown;
- }
-
- @Override
- public V get() throws ExecutionException {
- throw new ExecutionException(thrown);
- }
-
- @Override
- public V checkedGet() throws X {
- throw thrown;
- }
-
- @Override
- public V checkedGet(long timeout, TimeUnit unit) throws X {
- checkNotNull(unit);
- throw thrown;
- }
- }
-
/**
* Creates a {@code ListenableFuture} which has its value set immediately upon
* construction. The getters just return the value. This {@code Future} can't
@@ -237,7 +88,9 @@ public final class Futures {
* {@code true}.
*/
public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
- return new ImmediateSuccessfulFuture<V>(value);
+ SettableFuture<V> future = SettableFuture.create();
+ future.set(value);
+ return future;
}
/**
@@ -250,7 +103,14 @@ public final class Futures {
*/
public static <V, X extends Exception> CheckedFuture<V, X>
immediateCheckedFuture(@Nullable V value) {
- return new ImmediateSuccessfulCheckedFuture<V, X>(value);
+ SettableFuture<V> future = SettableFuture.create();
+ future.set(value);
+ return Futures.makeChecked(future, new Function<Exception, X>() {
+ @Override
+ public X apply(Exception e) {
+ throw new AssertionError("impossible");
+ }
+ });
}
/**
@@ -261,21 +121,15 @@ public final class Futures {
* method always returns {@code true}. Calling {@code get()} will immediately
* throw the provided {@code Throwable} wrapped in an {@code
* ExecutionException}.
+ *
+ * @throws Error if the throwable is an {@link Error}.
*/
public static <V> ListenableFuture<V> immediateFailedFuture(
Throwable throwable) {
checkNotNull(throwable);
- return new ImmediateFailedFuture<V>(throwable);
- }
-
- /**
- * Creates a {@code ListenableFuture} which is cancelled immediately upon
- * construction, so that {@code isCancelled()} always returns {@code true}.
- *
- * @since 14.0
- */
- public static <V> ListenableFuture<V> immediateCancelledFuture() {
- return new ImmediateCancelledFuture<V>();
+ SettableFuture<V> future = SettableFuture.create();
+ future.setException(throwable);
+ return future;
}
/**
@@ -284,224 +138,149 @@ public final class Futures {
*
* <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
* method always returns {@code true}. Calling {@code get()} will immediately
- * throw the provided {@code Exception} wrapped in an {@code
+ * throw the provided {@code Throwable} wrapped in an {@code
* ExecutionException}, and calling {@code checkedGet()} will throw the
* provided exception itself.
+ *
+ * @throws Error if the throwable is an {@link Error}.
*/
public static <V, X extends Exception> CheckedFuture<V, X>
- immediateFailedCheckedFuture(X exception) {
+ immediateFailedCheckedFuture(final X exception) {
checkNotNull(exception);
- return new ImmediateFailedCheckedFuture<V, X>(exception);
+ return makeChecked(Futures.<V>immediateFailedFuture(exception),
+ new Function<Exception, X>() {
+ @Override
+ public X apply(Exception e) {
+ return exception;
+ }
+ });
}
/**
- * Returns a {@code Future} whose result is taken from the given primary
- * {@code input} or, if the primary input fails, from the {@code Future}
- * provided by the {@code fallback}. {@link FutureFallback#create} is not
- * invoked until the primary input has failed, so if the primary input
- * succeeds, it is never invoked. If, during the invocation of {@code
- * fallback}, an exception is thrown, this exception is used as the result of
- * the output {@code Future}.
- *
- * <p>Below is an example of a fallback that returns a default value if an
- * exception occurs:
- *
- * <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter in case an exception happens when
- * // processing the RPC to fetch counters.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * // Returning "0" as the default for the counter when the
- * // exception happens.
- * return immediateFuture(0);
- * }
- * });
- * }</pre>
- *
- * The fallback can also choose to propagate the original exception when
- * desired:
+ * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
+ * derived from the result of the given {@code Future}. More precisely, the
+ * returned {@code Future} takes its result from a {@code Future} produced by
+ * applying the given {@code Function} to the result of the original {@code
+ * Future}. Example:
*
* <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter only in case the exception was a
- * // TimeoutException.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * if (t instanceof TimeoutException) {
- * return immediateFuture(0);
- * }
- * return immediateFailedFuture(t);
+ * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
+ * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
+ * new Function<RowKey, ListenableFuture<QueryResult>>() {
+ * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
+ * return dataService.read(rowKey);
* }
- * });
+ * };
+ * ListenableFuture<QueryResult> queryFuture =
+ * chain(rowKeyFuture, queryFunction);
* }</pre>
*
- * Note: If the derived {@code Future} is slow or heavyweight to create
- * (whether the {@code Future} itself is slow or heavyweight to complete is
- * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
- * FutureFallback, Executor) supplying an executor}. If you do not supply an
- * executor, {@code withFallback} will use {@link
- * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
- * caveats for heavier operations. For example, the call to {@code
- * fallback.create} may run on an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code withFallback}
- * is called, {@code withFallback} will call {@code fallback.create} inline.
- * <li>If the input {@code Future} is not yet done, {@code withFallback} will
- * schedule {@code fallback.create} to be run by the thread that completes
- * the input {@code Future}, which may be an internal system thread such as
- * an RPC network thread.
- * </ul>
+ * <p>Note: This overload of {@code chain} is designed for cases in which the
+ * work of creating the derived future is fast and lightweight, as the method
+ * does not accept an {@code Executor} in which to perform the the work. For
+ * heavier derivations, this overload carries some caveats: First, the thread
+ * that the derivation runs in depends on whether the input {@code Future} is
+ * done at the time {@code chain} is called. In particular, if called late,
+ * {@code chain} will run the derivation in the thread that called {@code
+ * chain}. Second, derivations may run in an internal thread of the system
+ * responsible for the input {@code Future}, such as an RPC network thread.
+ * Finally, during the execution of a {@code sameThreadExecutor} {@code
+ * chain} function, all other registered but unexecuted listeners are
+ * prevented from running, even if those listeners are to run in other
+ * executors.
*
- * Also note that, regardless of which thread executes {@code
- * fallback.create}, all other registered but unexecuted listeners are
- * prevented from running during its execution, even if those listeners are
- * to run in other executors.
+ * <p>The returned {@code Future} attempts to keep its cancellation state in
+ * sync with that of the input future and that of the future returned by the
+ * chain function. That is, if the returned {@code Future} is cancelled, it
+ * will attempt to cancel the other two, and if either of the other two is
+ * cancelled, the returned {@code Future} will receive a callback in which it
+ * will attempt to cancel itself.
*
- * @param input the primary input {@code Future}
- * @param fallback the {@link FutureFallback} implementation to be called if
- * {@code input} fails
- * @since 14.0
+ * @param input The future to chain
+ * @param function A function to chain the results of the provided future
+ * to the results of the returned future. This will be run in the thread
+ * that notifies input it is complete.
+ * @return A future that holds result of the chain.
+ * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
+ * use {@link #transform(ListenableFuture, AsyncFunction)}. This method is
+ * scheduled to be removed from Guava in Guava release 12.0.
*/
- public static <V> ListenableFuture<V> withFallback(
- ListenableFuture<? extends V> input,
- FutureFallback<? extends V> fallback) {
- return withFallback(input, fallback, sameThreadExecutor());
+ @Deprecated
+ public static <I, O> ListenableFuture<O> chain(
+ ListenableFuture<I> input,
+ Function<? super I, ? extends ListenableFuture<? extends O>> function) {
+ return chain(input, function, MoreExecutors.sameThreadExecutor());
}
/**
- * Returns a {@code Future} whose result is taken from the given primary
- * {@code input} or, if the primary input fails, from the {@code Future}
- * provided by the {@code fallback}. {@link FutureFallback#create} is not
- * invoked until the primary input has failed, so if the primary input
- * succeeds, it is never invoked. If, during the invocation of {@code
- * fallback}, an exception is thrown, this exception is used as the result of
- * the output {@code Future}.
- *
- * <p>Below is an example of a fallback that returns a default value if an
- * exception occurs:
+ * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
+ * derived from the result of the given {@code Future}. More precisely, the
+ * returned {@code Future} takes its result from a {@code Future} produced by
+ * applying the given {@code Function} to the result of the original {@code
+ * Future}. Example:
*
* <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter in case an exception happens when
- * // processing the RPC to fetch counters.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * // Returning "0" as the default for the counter when the
- * // exception happens.
- * return immediateFuture(0);
+ * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
+ * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
+ * new Function<RowKey, ListenableFuture<QueryResult>>() {
+ * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
+ * return dataService.read(rowKey);
* }
- * }, sameThreadExecutor());
+ * };
+ * ListenableFuture<QueryResult> queryFuture =
+ * chain(rowKeyFuture, queryFunction, executor);
* }</pre>
*
- * The fallback can also choose to propagate the original exception when
- * desired:
- *
- * <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter only in case the exception was a
- * // TimeoutException.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * if (t instanceof TimeoutException) {
- * return immediateFuture(0);
- * }
- * return immediateFailedFuture(t);
- * }
- * }, sameThreadExecutor());
- * }</pre>
+ * <p>The returned {@code Future} attempts to keep its cancellation state in
+ * sync with that of the input future and that of the future returned by the
+ * chain function. That is, if the returned {@code Future} is cancelled, it
+ * will attempt to cancel the other two, and if either of the other two is
+ * cancelled, the returned {@code Future} will receive a callback in which it
+ * will attempt to cancel itself.
*
- * When the execution of {@code fallback.create} is fast and lightweight
- * (though the {@code Future} it returns need not meet these criteria),
- * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
- * omitting the executor} or explicitly specifying {@code
- * sameThreadExecutor}. However, be aware of the caveats documented in the
- * link above.
- *
- * @param input the primary input {@code Future}
- * @param fallback the {@link FutureFallback} implementation to be called if
- * {@code input} fails
- * @param executor the executor that runs {@code fallback} if {@code input}
- * fails
- * @since 14.0
- */
- public static <V> ListenableFuture<V> withFallback(
- ListenableFuture<? extends V> input,
- FutureFallback<? extends V> fallback, Executor executor) {
- checkNotNull(fallback);
- return new FallbackFuture<V>(input, fallback, executor);
- }
-
- /**
- * A future that falls back on a second, generated future, in case its
- * original future fails.
+ * <p>Note: For cases in which the work of creating the derived future is
+ * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture,
+ * Function) the other overload} or explicit use of {@code
+ * sameThreadExecutor}. For heavier derivations, this choice carries some
+ * caveats: First, the thread that the derivation runs in depends on whether
+ * the input {@code Future} is done at the time {@code chain} is called. In
+ * particular, if called late, {@code chain} will run the derivation in the
+ * thread that called {@code chain}. Second, derivations may run in an
+ * internal thread of the system responsible for the input {@code Future},
+ * such as an RPC network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} {@code chain} function, all other registered but
+ * unexecuted listeners are prevented from running, even if those listeners
+ * are to run in other executors.
+ *
+ * @param input The future to chain
+ * @param function A function to chain the results of the provided future
+ * to the results of the returned future.
+ * @param executor Executor to run the function in.
+ * @return A future that holds result of the chain.
+ * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
+ * use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This
+ * method is scheduled to be removed from Guava in Guava release 12.0.
*/
- private static class FallbackFuture<V> extends AbstractFuture<V> {
-
- private volatile ListenableFuture<? extends V> running;
-
- FallbackFuture(ListenableFuture<? extends V> input,
- final FutureFallback<? extends V> fallback,
- final Executor executor) {
- running = input;
- addCallback(running, new FutureCallback<V>() {
- @Override
- public void onSuccess(V value) {
- set(value);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (isCancelled()) {
- return;
- }
- try {
- running = fallback.create(t);
- if (isCancelled()) { // in case cancel called in the meantime
- running.cancel(wasInterrupted());
- return;
- }
- addCallback(running, new FutureCallback<V>() {
- @Override
- public void onSuccess(V value) {
- set(value);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (running.isCancelled()) {
- cancel(false);
- } else {
- setException(t);
- }
- }
- }, sameThreadExecutor());
- } catch (Exception e) {
- setException(e);
- } catch (Error e) {
- setException(e); // note: rethrows
+ @Deprecated
+ public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
+ final Function<? super I, ? extends ListenableFuture<? extends O>>
+ function,
+ Executor executor) {
+ checkNotNull(function);
+ ChainingListenableFuture<I, O> chain =
+ new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() {
+ @Override
+ /*
+ * All methods of ListenableFuture are covariant, and we don't expose
+ * the object anywhere that would allow it to be downcast.
+ */
+ @SuppressWarnings("unchecked")
+ public ListenableFuture<O> apply(I input) {
+ return (ListenableFuture) function.apply(input);
}
- }
- }, executor);
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- running.cancel(mayInterruptIfRunning);
- return true;
- }
- return false;
- }
+ }, input);
+ input.addListener(chain, executor);
+ return chain;
}
/**
@@ -523,28 +302,20 @@ public final class Futures {
* transform(rowKeyFuture, queryFunction);
* }</pre>
*
- * Note: If the derived {@code Future} is slow or heavyweight to create
- * (whether the {@code Future} itself is slow or heavyweight to complete is
- * irrelevant), consider {@linkplain #transform(ListenableFuture,
- * AsyncFunction, Executor) supplying an executor}. If you do not supply an
- * executor, {@code transform} will use {@link
- * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
- * caveats for heavier operations. For example, the call to {@code
- * function.apply} may run on an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code transform} is
- * called, {@code transform} will call {@code function.apply} inline.
- * <li>If the input {@code Future} is not yet done, {@code transform} will
- * schedule {@code function.apply} to be run by the thread that completes the
- * input {@code Future}, which may be an internal system thread such as an
- * RPC network thread.
- * </ul>
- *
- * Also note that, regardless of which thread executes {@code
- * function.apply}, all other registered but unexecuted listeners are
- * prevented from running during its execution, even if those listeners are
- * to run in other executors.
+ * <p>Note: This overload of {@code transform} is designed for cases in which
+ * the work of creating the derived {@code Future} is fast and lightweight,
+ * as the method does not accept an {@code Executor} in which to perform the
+ * the work. (The created {@code Future} itself need not complete quickly.)
+ * For heavier operations, this overload carries some caveats: First, the
+ * thread that {@code function.apply} runs in depends on whether the input
+ * {@code Future} is done at the time {@code transform} is called. In
+ * particular, if called late, {@code transform} will run the operation in
+ * the thread that called {@code transform}. Second, {@code function.apply}
+ * may run in an internal thread of the system responsible for the input
+ * {@code Future}, such as an RPC network thread. Finally, during the
+ * execution of a {@code sameThreadExecutor} {@code function.apply}, all
+ * other registered but unexecuted listeners are prevented from running, even
+ * if those listeners are to run in other executors.
*
* <p>The returned {@code Future} attempts to keep its cancellation state in
* sync with that of the input future and that of the future returned by the
@@ -591,11 +362,20 @@ public final class Futures {
* cancelled, the returned {@code Future} will receive a callback in which it
* will attempt to cancel itself.
*
- * <p>When the execution of {@code function.apply} is fast and lightweight
- * (though the {@code Future} it returns need not meet these criteria),
- * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
- * the executor} or explicitly specifying {@code sameThreadExecutor}.
- * However, be aware of the caveats documented in the link above.
+ * <p>Note: For cases in which the work of creating the derived future is
+ * fast and lightweight, consider {@linkplain
+ * Futures#transform(ListenableFuture, Function) the other overload} or
+ * explicit use of {@code sameThreadExecutor}. For heavier derivations, this
+ * choice carries some caveats: First, the thread that {@code function.apply}
+ * runs in depends on whether the input {@code Future} is done at the time
+ * {@code transform} is called. In particular, if called late, {@code
+ * transform} will run the operation in the thread that called {@code
+ * transform}. Second, {@code function.apply} may run in an internal thread
+ * of the system responsible for the input {@code Future}, such as an RPC
+ * network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} {@code function.apply}, all other registered but
+ * unexecuted listeners are prevented from running, even if those listeners
+ * are to run in other executors.
*
* @param input The future to transform
* @param function A function to transform the result of the input future
@@ -631,26 +411,19 @@ public final class Futures {
* transform(queryFuture, rowsFunction);
* }</pre>
*
- * Note: If the transformation is slow or heavyweight, consider {@linkplain
- * #transform(ListenableFuture, Function, Executor) supplying an executor}.
- * If you do not supply an executor, {@code transform} will use {@link
- * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
- * caveats for heavier operations. For example, the call to {@code
- * function.apply} may run on an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code transform} is
- * called, {@code transform} will call {@code function.apply} inline.
- * <li>If the input {@code Future} is not yet done, {@code transform} will
- * schedule {@code function.apply} to be run by the thread that completes the
- * input {@code Future}, which may be an internal system thread such as an
- * RPC network thread.
- * </ul>
- *
- * Also note that, regardless of which thread executes {@code
- * function.apply}, all other registered but unexecuted listeners are
- * prevented from running during its execution, even if those listeners are
- * to run in other executors.
+ * <p>Note: This overload of {@code transform} is designed for cases in which
+ * the transformation is fast and lightweight, as the method does not accept
+ * an {@code Executor} in which to perform the the work. For heavier
+ * transformations, this overload carries some caveats: First, the thread
+ * that the transformation runs in depends on whether the input {@code
+ * Future} is done at the time {@code transform} is called. In particular, if
+ * called late, {@code transform} will perform the transformation in the
+ * thread that called {@code transform}. Second, transformations may run in
+ * an internal thread of the system responsible for the input {@code Future},
+ * such as an RPC network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} transformation, all other registered but unexecuted
+ * listeners are prevented from running, even if those listeners are to run
+ * in other executors.
*
* <p>The returned {@code Future} attempts to keep its cancellation state in
* sync with that of the input future. That is, if the returned {@code Future}
@@ -661,16 +434,16 @@ public final class Futures {
* <p>An example use of this method is to convert a serializable object
* returned from an RPC into a POJO.
*
- * @param input The future to transform
+ * @param future The future to transform
* @param function A Function to transform the results of the provided future
* to the results of the returned future. This will be run in the thread
* that notifies input it is complete.
* @return A future that holds result of the transformation.
* @since 9.0 (in 1.0 as {@code compose})
*/
- public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
+ public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
final Function<? super I, ? extends O> function) {
- return transform(input, function, MoreExecutors.sameThreadExecutor());
+ return transform(future, function, MoreExecutors.sameThreadExecutor());
}
/**
@@ -699,29 +472,39 @@ public final class Futures {
* <p>An example use of this method is to convert a serializable object
* returned from an RPC into a POJO.
*
- * <p>When the transformation is fast and lightweight, consider {@linkplain
- * #transform(ListenableFuture, Function) omitting the executor} or
- * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
- * caveats documented in the link above.
- *
- * @param input The future to transform
+ * <p>Note: For cases in which the transformation is fast and lightweight,
+ * consider {@linkplain Futures#transform(ListenableFuture, Function) the
+ * other overload} or explicit use of {@link
+ * MoreExecutors#sameThreadExecutor}. For heavier transformations, this
+ * choice carries some caveats: First, the thread that the transformation
+ * runs in depends on whether the input {@code Future} is done at the time
+ * {@code transform} is called. In particular, if called late, {@code
+ * transform} will perform the transformation in the thread that called
+ * {@code transform}. Second, transformations may run in an internal thread
+ * of the system responsible for the input {@code Future}, such as an RPC
+ * network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} transformation, all other registered but unexecuted
+ * listeners are prevented from running, even if those listeners are to run
+ * in other executors.
+ *
+ * @param future The future to transform
* @param function A Function to transform the results of the provided future
* to the results of the returned future.
* @param executor Executor to run the function in.
* @return A future that holds result of the transformation.
* @since 9.0 (in 2.0 as {@code compose})
*/
- public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
+ public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
final Function<? super I, ? extends O> function, Executor executor) {
checkNotNull(function);
- AsyncFunction<I, O> wrapperFunction
- = new AsyncFunction<I, O>() {
+ Function<I, ListenableFuture<O>> wrapperFunction
+ = new Function<I, ListenableFuture<O>>() {
@Override public ListenableFuture<O> apply(I input) {
O output = function.apply(input);
return immediateFuture(output);
}
};
- return transform(input, wrapperFunction, executor);
+ return chain(future, wrapperFunction, executor);
}
/**
@@ -741,43 +524,43 @@ public final class Futures {
* who don't have a {@code ListenableFuture} available and
* do not mind repeated, lazy function evaluation.
*
- * @param input The future to transform
+ * @param future The future to transform
* @param function A Function to transform the results of the provided future
* to the results of the returned future.
* @return A future that returns the result of the transformation.
* @since 10.0
*/
@Beta
- public static <I, O> Future<O> lazyTransform(final Future<I> input,
+ public static <I, O> Future<O> lazyTransform(final Future<I> future,
final Function<? super I, ? extends O> function) {
- checkNotNull(input);
+ checkNotNull(future);
checkNotNull(function);
return new Future<O>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- return input.cancel(mayInterruptIfRunning);
+ return future.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
- return input.isCancelled();
+ return future.isCancelled();
}
@Override
public boolean isDone() {
- return input.isDone();
+ return future.isDone();
}
@Override
public O get() throws InterruptedException, ExecutionException {
- return applyTransformation(input.get());
+ return applyTransformation(future.get());
}
@Override
public O get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- return applyTransformation(input.get(timeout, unit));
+ return applyTransformation(future.get(timeout, unit));
}
private O applyTransformation(I input) throws ExecutionException {
@@ -806,6 +589,8 @@ public final class Futures {
private AsyncFunction<? super I, ? extends O> function;
private ListenableFuture<? extends I> inputFuture;
private volatile ListenableFuture<? extends O> outputFuture;
+ private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
+ new LinkedBlockingQueue<Boolean>(1);
private final CountDownLatch outputCreated = new CountDownLatch(1);
private ChainingListenableFuture(
@@ -815,6 +600,90 @@ public final class Futures {
this.inputFuture = checkNotNull(inputFuture);
}
+ /**
+ * Delegate the get() to the input and output futures, in case
+ * their implementations defer starting computation until their
+ * own get() is invoked.
+ */
+ @Override
+ public O get() throws InterruptedException, ExecutionException {
+ if (!isDone()) {
+ // Invoking get on the inputFuture will ensure our own run()
+ // method below is invoked as a listener when inputFuture sets
+ // its value. Therefore when get() returns we should then see
+ // the outputFuture be created.
+ ListenableFuture<? extends I> inputFuture = this.inputFuture;
+ if (inputFuture != null) {
+ inputFuture.get();
+ }
+
+ // If our listener was scheduled to run on an executor we may
+ // need to wait for our listener to finish running before the
+ // outputFuture has been constructed by the function.
+ outputCreated.await();
+
+ // Like above with the inputFuture, we have a listener on
+ // the outputFuture that will set our own value when its
+ // value is set. Invoking get will ensure the output can
+ // complete and invoke our listener, so that we can later
+ // get the result.
+ ListenableFuture<? extends O> outputFuture = this.outputFuture;
+ if (outputFuture != null) {
+ outputFuture.get();
+ }
+ }
+ return super.get();
+ }
+
+ /**
+ * Delegate the get() to the input and output futures, in case
+ * their implementations defer starting computation until their
+ * own get() is invoked.
+ */
+ @Override
+ public O get(long timeout, TimeUnit unit) throws TimeoutException,
+ ExecutionException, InterruptedException {
+ if (!isDone()) {
+ // Use a single time unit so we can decrease remaining timeout
+ // as we wait for various phases to complete.
+ if (unit != NANOSECONDS) {
+ timeout = NANOSECONDS.convert(timeout, unit);
+ unit = NANOSECONDS;
+ }
+
+ // Invoking get on the inputFuture will ensure our own run()
+ // method below is invoked as a listener when inputFuture sets
+ // its value. Therefore when get() returns we should then see
+ // the outputFuture be created.
+ ListenableFuture<? extends I> inputFuture = this.inputFuture;
+ if (inputFuture != null) {
+ long start = System.nanoTime();
+ inputFuture.get(timeout, unit);
+ timeout -= Math.max(0, System.nanoTime() - start);
+ }
+
+ // If our listener was scheduled to run on an executor we may
+ // need to wait for our listener to finish running before the
+ // outputFuture has been constructed by the function.
+ long start = System.nanoTime();
+ if (!outputCreated.await(timeout, unit)) {
+ throw new TimeoutException();
+ }
+ timeout -= Math.max(0, System.nanoTime() - start);
+
+ // Like above with the inputFuture, we have a listener on
+ // the outputFuture that will set our own value when its
+ // value is set. Invoking get will ensure the output can
+ // complete and invoke our listener, so that we can later
+ // get the result.
+ ListenableFuture<? extends O> outputFuture = this.outputFuture;
+ if (outputFuture != null) {
+ outputFuture.get(timeout, unit);
+ }
+ }
+ return super.get(timeout, unit);
+ }
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
/*
@@ -824,6 +693,7 @@ public final class Futures {
if (super.cancel(mayInterruptIfRunning)) {
// This should never block since only one thread is allowed to cancel
// this Future.
+ putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
cancel(inputFuture, mayInterruptIfRunning);
cancel(outputFuture, mayInterruptIfRunning);
return true;
@@ -859,7 +729,13 @@ public final class Futures {
final ListenableFuture<? extends O> outputFuture = this.outputFuture =
function.apply(sourceResult);
if (isCancelled()) {
- outputFuture.cancel(wasInterrupted());
+ // Handles the case where cancel was called while the function was
+ // being applied.
+ // There is a gap in cancel(boolean) between calling sync.cancel()
+ // and storing the value of mayInterruptIfRunning, so this thread
+ // needs to block, waiting for that value.
+ outputFuture.cancel(
+ takeUninterruptibly(mayInterruptIfRunningChannel));
this.outputFuture = null;
return;
}
@@ -907,52 +783,14 @@ public final class Futures {
}
/**
- * Returns a new {@code ListenableFuture} whose result is the product of
- * calling {@code get()} on the {@code Future} nested within the given {@code
- * Future}, effectively chaining the futures one after the other. Example:
- *
- * <pre> {@code
- * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
- * ListenableFuture<String> dereferenced = dereference(nested);
- * }</pre>
- *
- * <p>This call has the same cancellation and execution semantics as {@link
- * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
- * Future} attempts to keep its cancellation state in sync with both the
- * input {@code Future} and the nested {@code Future}. The transformation
- * is very lightweight and therefore takes place in the thread that called
- * {@code dereference}.
- *
- * @param nested The nested future to transform.
- * @return A future that holds result of the inner future.
- * @since 13.0
- */
- @Beta
- @SuppressWarnings({"rawtypes", "unchecked"})
- public static <V> ListenableFuture<V> dereference(
- ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
- return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
- }
-
- /**
- * Helper {@code Function} for {@link #dereference}.
- */
- private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
- new AsyncFunction<ListenableFuture<Object>, Object>() {
- @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
- return input;
- }
- };
-
- /**
* Creates a new {@code ListenableFuture} whose value is a list containing the
* values of all its input futures, if all succeed. If any input fails, the
* returned future fails.
*
* <p>The list of results is in the same order as the input list.
*
- * <p>Canceling this future will attempt to cancel all the component futures,
- * and if any of the provided futures fails or is canceled, this one is,
+ * <p>Canceling this future does not cancel any of the component futures;
+ * however, if any of the provided futures fails or is canceled, this one is,
* too.
*
* @param futures futures to combine
@@ -963,7 +801,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> allAsList(
ListenableFuture<? extends V>... futures) {
- return listFuture(ImmutableList.copyOf(futures), true,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), true,
MoreExecutors.sameThreadExecutor());
}
@@ -974,8 +812,8 @@ public final class Futures {
*
* <p>The list of results is in the same order as the input list.
*
- * <p>Canceling this future will attempt to cancel all the component futures,
- * and if any of the provided futures fails or is canceled, this one is,
+ * <p>Canceling this future does not cancel any of the component futures;
+ * however, if any of the provided futures fails or is canceled, this one is,
* too.
*
* @param futures futures to combine
@@ -986,7 +824,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> allAsList(
Iterable<? extends ListenableFuture<? extends V>> futures) {
- return listFuture(ImmutableList.copyOf(futures), true,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), true,
MoreExecutors.sameThreadExecutor());
}
@@ -998,8 +836,6 @@ public final class Futures {
* indistinguishable from the future having a successful value of
* {@code null}).
*
- * <p>Canceling this future will attempt to cancel all the component futures.
- *
* @param futures futures to combine
* @return a future that provides a list of the results of the component
* futures
@@ -1008,7 +844,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> successfulAsList(
ListenableFuture<? extends V>... futures) {
- return listFuture(ImmutableList.copyOf(futures), false,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), false,
MoreExecutors.sameThreadExecutor());
}
@@ -1020,8 +856,6 @@ public final class Futures {
* indistinguishable from the future having a successful value of
* {@code null}).
*
- * <p>Canceling this future will attempt to cancel all the component futures.
- *
* @param futures futures to combine
* @return a future that provides a list of the results of the component
* futures
@@ -1030,7 +864,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> successfulAsList(
Iterable<? extends ListenableFuture<? extends V>> futures) {
- return listFuture(ImmutableList.copyOf(futures), false,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), false,
MoreExecutors.sameThreadExecutor());
}
@@ -1055,26 +889,19 @@ public final class Futures {
* }
* });}</pre>
*
- * Note: If the callback is slow or heavyweight, consider {@linkplain
- * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
- * executor}. If you do not supply an executor, {@code addCallback} will use
- * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
- * some caveats for heavier operations. For example, the callback may run on
- * an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code addCallback} is
- * called, {@code addCallback} will execute the callback inline.
- * <li>If the input {@code Future} is not yet done, {@code addCallback} will
- * schedule the callback to be run by the thread that completes the input
- * {@code Future}, which may be an internal system thread such as an RPC
- * network thread.
- * </ul>
- *
- * Also note that, regardless of which thread executes the callback, all
- * other registered but unexecuted listeners are prevented from running
- * during its execution, even if those listeners are to run in other
- * executors.
+ * <p>Note: This overload of {@code addCallback} is designed for cases in
+ * which the callack is fast and lightweight, as the method does not accept
+ * an {@code Executor} in which to perform the the work. For heavier
+ * callbacks, this overload carries some caveats: First, the thread that the
+ * callback runs in depends on whether the input {@code Future} is done at the
+ * time {@code addCallback} is called and on whether the input {@code Future}
+ * is ever cancelled. In particular, {@code addCallback} may execute the
+ * callback in the thread that calls {@code addCallback} or {@code
+ * Future.cancel}. Second, callbacks may run in an internal thread of the
+ * system responsible for the input {@code Future}, such as an RPC network
+ * thread. Finally, during the execution of a {@code sameThreadExecutor}
+ * callback, all other registered but unexecuted listeners are prevented from
+ * running, even if those listeners are to run in other executors.
*
* <p>For a more general interface to attach a completion listener to a
* {@code Future}, see {@link ListenableFuture#addListener addListener}.
@@ -1111,10 +938,20 @@ public final class Futures {
* }
* });}</pre>
*
- * When the callback is fast and lightweight, consider {@linkplain
- * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
- * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
- * caveats documented in the link above.
+ * When the callback is fast and lightweight consider {@linkplain
+ * Futures#addCallback(ListenableFuture, FutureCallback) the other overload}
+ * or explicit use of {@link MoreExecutors#sameThreadExecutor
+ * sameThreadExecutor}. For heavier callbacks, this choice carries some
+ * caveats: First, the thread that the callback runs in depends on whether
+ * the input {@code Future} is done at the time {@code addCallback} is called
+ * and on whether the input {@code Future} is ever cancelled. In particular,
+ * {@code addCallback} may execute the callback in the thread that calls
+ * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in
+ * an internal thread of the system responsible for the input {@code Future},
+ * such as an RPC network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} callback, all other registered but unexecuted
+ * listeners are prevented from running, even if those listeners are to run
+ * in other executors.
*
* <p>For a more general interface to attach a completion listener to a
* {@code Future}, see {@link ListenableFuture#addListener addListener}.
@@ -1131,22 +968,18 @@ public final class Futures {
Runnable callbackListener = new Runnable() {
@Override
public void run() {
- final V value;
try {
// TODO(user): (Before Guava release), validate that this
// is the thing for IE.
- value = getUninterruptibly(future);
+ V value = getUninterruptibly(future);
+ callback.onSuccess(value);
} catch (ExecutionException e) {
callback.onFailure(e.getCause());
- return;
} catch (RuntimeException e) {
callback.onFailure(e);
- return;
} catch (Error e) {
callback.onFailure(e);
- return;
}
- callback.onSuccess(value);
}
};
future.addListener(callbackListener, executor);
@@ -1434,53 +1267,49 @@ public final class Futures {
}
}
- private interface FutureCombiner<V, C> {
- C combine(List<Optional<V>> values);
- }
-
- private static class CombinedFuture<V, C> extends AbstractFuture<C> {
- ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
+ /**
+ * Class that implements {@link #allAsList} and {@link #successfulAsList}.
+ * The idea is to create a (null-filled) List and register a listener with
+ * each component future to fill out the value in the List when that future
+ * completes.
+ */
+ private static class ListFuture<V> extends AbstractFuture<List<V>> {
+ ImmutableList<? extends ListenableFuture<? extends V>> futures;
final boolean allMustSucceed;
final AtomicInteger remaining;
- FutureCombiner<V, C> combiner;
- List<Optional<V>> values;
+ List<V> values;
- CombinedFuture(
- ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
- boolean allMustSucceed, Executor listenerExecutor,
- FutureCombiner<V, C> combiner) {
+ /**
+ * Constructor.
+ *
+ * @param futures all the futures to build the list from
+ * @param allMustSucceed whether a single failure or cancellation should
+ * propagate to this future
+ * @param listenerExecutor used to run listeners on all the passed in
+ * futures.
+ */
+ ListFuture(
+ final ImmutableList<? extends ListenableFuture<? extends V>> futures,
+ final boolean allMustSucceed, final Executor listenerExecutor) {
this.futures = futures;
+ this.values = Lists.newArrayListWithCapacity(futures.size());
this.allMustSucceed = allMustSucceed;
this.remaining = new AtomicInteger(futures.size());
- this.combiner = combiner;
- this.values = Lists.newArrayListWithCapacity(futures.size());
+
init(listenerExecutor);
}
- /**
- * Must be called at the end of the constructor.
- */
- protected void init(final Executor listenerExecutor) {
+ private void init(final Executor listenerExecutor) {
// First, schedule cleanup to execute when the Future is done.
addListener(new Runnable() {
@Override
public void run() {
- // Cancel all the component futures.
- if (CombinedFuture.this.isCancelled()) {
- for (ListenableFuture<?> future : CombinedFuture.this.futures) {
- future.cancel(CombinedFuture.this.wasInterrupted());
- }
- }
-
// By now the values array has either been set as the Future's value,
// or (in case of failure) is no longer useful.
- CombinedFuture.this.futures = null;
+ ListFuture.this.values = null;
// Let go of the memory held by other futures
- CombinedFuture.this.values = null;
-
- // The combiner may also hold state, so free that as well
- CombinedFuture.this.combiner = null;
+ ListFuture.this.futures = null;
}
}, MoreExecutors.sameThreadExecutor());
@@ -1488,7 +1317,7 @@ public final class Futures {
// Corner case: List is empty.
if (futures.isEmpty()) {
- set(combiner.combine(ImmutableList.<Optional<V>>of()));
+ set(Lists.newArrayList(values));
return;
}
@@ -1503,11 +1332,11 @@ public final class Futures {
// this loop, the last call to addListener() will callback to
// setOneValue(), transitively call our cleanup listener, and set
// this.futures to null.
- // This is not actually a problem, since the foreach only needs
- // this.futures to be non-null at the beginning of the loop.
- int i = 0;
- for (final ListenableFuture<? extends V> listenable : futures) {
- final int index = i++;
+ // We store a reference to futures to avoid the NPE.
+ ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
+ for (int i = 0; i < localFutures.size(); i++) {
+ final ListenableFuture<? extends V> listenable = localFutures.get(i);
+ final int index = i;
listenable.addListener(new Runnable() {
@Override
public void run() {
@@ -1521,12 +1350,12 @@ public final class Futures {
* Sets the value at the given index to that of the given future.
*/
private void setOneValue(int index, Future<? extends V> future) {
- List<Optional<V>> localValues = values;
+ List<V> localValues = values;
if (isDone() || localValues == null) {
// Some other future failed or has been cancelled, causing this one to
// also be cancelled or have an exception set. This should only happen
- // if allMustSucceed is true or if the output itself has been cancelled.
- checkState(allMustSucceed || isCancelled(),
+ // if allMustSucceed is true.
+ checkState(allMustSucceed,
"Future was done before all dependencies completed");
return;
}
@@ -1534,8 +1363,7 @@ public final class Futures {
try {
checkState(future.isDone(),
"Tried to set value from future which is not done");
- V returnValue = getUninterruptibly(future);
- localValues.set(index, Optional.fromNullable(returnValue));
+ localValues.set(index, getUninterruptibly(future));
} catch (CancellationException e) {
if (allMustSucceed) {
// Set ourselves as cancelled. Let the input futures keep running
@@ -1561,9 +1389,9 @@ public final class Futures {
int newRemaining = remaining.decrementAndGet();
checkState(newRemaining >= 0, "Less than 0 remaining futures");
if (newRemaining == 0) {
- FutureCombiner<V, C> localCombiner = combiner;
- if (localCombiner != null) {
- set(localCombiner.combine(localValues));
+ localValues = values;
+ if (localValues != null) {
+ set(Lists.newArrayList(localValues));
} else {
checkState(isDone());
}
@@ -1571,25 +1399,46 @@ public final class Futures {
}
}
- }
+ @Override
+ public List<V> get() throws InterruptedException, ExecutionException {
+ callAllGets();
- /** Used for {@link #allAsList} and {@link #successfulAsList}. */
- private static <V> ListenableFuture<List<V>> listFuture(
- ImmutableList<ListenableFuture<? extends V>> futures,
- boolean allMustSucceed, Executor listenerExecutor) {
- return new CombinedFuture<V, List<V>>(
- futures, allMustSucceed, listenerExecutor,
- new FutureCombiner<V, List<V>>() {
- @Override
- public List<V> combine(List<Optional<V>> values) {
- List<V> result = Lists.newArrayList();
- for (Optional<V> element : values) {
- result.add(element != null ? element.orNull() : null);
+ // This may still block in spite of the calls above, as the listeners may
+ // be scheduled for execution in other threads.
+ return super.get();
+ }
+
+ /**
+ * Calls the get method of all dependency futures to work around a bug in
+ * some ListenableFutures where the listeners aren't called until get() is
+ * called.
+ */
+ private void callAllGets() throws InterruptedException {
+ List<? extends ListenableFuture<? extends V>> oldFutures = futures;
+ if (oldFutures != null && !isDone()) {
+ for (ListenableFuture<? extends V> future : oldFutures) {
+ // We wait for a little while for the future, but if it's not done,
+ // we check that no other futures caused a cancellation or failure.
+ // This can introduce a delay of up to 10ms in reporting an exception.
+ while (!future.isDone()) {
+ try {
+ future.get();
+ } catch (Error e) {
+ throw e;
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Throwable e) {
+ // ExecutionException / CancellationException / RuntimeException
+ if (allMustSucceed) {
+ return;
+ } else {
+ continue;
+ }
}
- // TODO(user): This should ultimately return an unmodifiableList
- return result;
}
- });
+ }
+ }
+ }
}
/**