aboutsummaryrefslogtreecommitdiffstats
path: root/guava/src/com/google/common/util
diff options
context:
space:
mode:
Diffstat (limited to 'guava/src/com/google/common/util')
-rw-r--r--guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java42
-rw-r--r--guava/src/com/google/common/util/concurrent/AbstractFuture.java65
-rw-r--r--guava/src/com/google/common/util/concurrent/AbstractIdleService.java40
-rw-r--r--guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java89
-rw-r--r--guava/src/com/google/common/util/concurrent/AbstractScheduledService.java89
-rw-r--r--guava/src/com/google/common/util/concurrent/AbstractService.java457
-rw-r--r--guava/src/com/google/common/util/concurrent/AsyncFunction.java3
-rw-r--r--guava/src/com/google/common/util/concurrent/AtomicDouble.java12
-rw-r--r--guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java4
-rw-r--r--guava/src/com/google/common/util/concurrent/AtomicLongMap.java20
-rw-r--r--guava/src/com/google/common/util/concurrent/Atomics.java3
-rw-r--r--guava/src/com/google/common/util/concurrent/CycleDetectingLockFactory.java1038
-rw-r--r--guava/src/com/google/common/util/concurrent/ExecutionError.java10
-rw-r--r--guava/src/com/google/common/util/concurrent/ExecutionList.java3
-rw-r--r--guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java6
-rw-r--r--guava/src/com/google/common/util/concurrent/ForwardingService.java23
-rw-r--r--guava/src/com/google/common/util/concurrent/FutureCallback.java3
-rw-r--r--guava/src/com/google/common/util/concurrent/FutureFallback.java48
-rw-r--r--guava/src/com/google/common/util/concurrent/Futures.java985
-rw-r--r--guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java34
-rw-r--r--guava/src/com/google/common/util/concurrent/ListenableFuture.java38
-rw-r--r--guava/src/com/google/common/util/concurrent/ListenableFutureTask.java11
-rw-r--r--guava/src/com/google/common/util/concurrent/MoreExecutors.java299
-rw-r--r--guava/src/com/google/common/util/concurrent/RateLimiter.java690
-rw-r--r--guava/src/com/google/common/util/concurrent/Service.java202
-rw-r--r--guava/src/com/google/common/util/concurrent/ServiceManager.java724
-rw-r--r--guava/src/com/google/common/util/concurrent/Striped.java376
-rw-r--r--guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java5
-rw-r--r--guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java18
-rw-r--r--guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java8
-rw-r--r--guava/src/com/google/common/util/concurrent/Uninterruptibles.java12
-rw-r--r--guava/src/com/google/common/util/concurrent/package-info.java1
32 files changed, 789 insertions, 4569 deletions
diff --git a/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java b/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java
index a1cb641..8136d23 100644
--- a/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java
+++ b/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java
@@ -55,8 +55,7 @@ public abstract class AbstractExecutionThreadService implements Service {
shutDown();
} catch (Exception ignored) {
logger.log(Level.WARNING,
- "Error while attempting to shut down the service"
- + " after failure.", ignored);
+ "Error while attempting to shut down the service after failure.", ignored);
}
throw t;
}
@@ -78,14 +77,7 @@ public abstract class AbstractExecutionThreadService implements Service {
};
/**
- * Constructor for use by subclasses.
- */
- protected AbstractExecutionThreadService() {}
-
- /**
* Start the service. This method is invoked on the execution thread.
- *
- * <p>By default this method does nothing.
*/
protected void startUp() throws Exception {}
@@ -107,16 +99,12 @@ public abstract class AbstractExecutionThreadService implements Service {
/**
* Stop the service. This method is invoked on the execution thread.
- *
- * <p>By default this method does nothing.
*/
// TODO: consider supporting a TearDownTestCase-like API
protected void shutDown() throws Exception {}
/**
* Invoked to request the service to stop.
- *
- * <p>By default this method does nothing.
*/
protected void triggerShutdown() {}
@@ -129,19 +117,19 @@ public abstract class AbstractExecutionThreadService implements Service {
* promptly.
*
* <p>The default implementation returns a new {@link Executor} that sets the
- * name of its threads to the string returned by {@link #serviceName}
+ * name of its threads to the string returned by {@link #getServiceName}
*/
protected Executor executor() {
return new Executor() {
@Override
public void execute(Runnable command) {
- MoreExecutors.newThread(serviceName(), command).start();
+ new Thread(command, getServiceName()).start();
}
};
}
@Override public String toString() {
- return serviceName() + " [" + state() + "]";
+ return getServiceName() + " [" + state() + "]";
}
// We override instead of using ForwardingService so that these can be final.
@@ -171,28 +159,14 @@ public abstract class AbstractExecutionThreadService implements Service {
}
/**
- * @since 13.0
- */
- @Override public final void addListener(Listener listener, Executor executor) {
- delegate.addListener(listener, executor);
- }
-
- /**
- * @since 14.0
- */
- @Override public final Throwable failureCause() {
- return delegate.failureCause();
- }
-
- /**
- * Returns the name of this service. {@link AbstractExecutionThreadService}
- * may include the name in debugging output.
+ * Returns the name of this service. {@link AbstractExecutionThreadService} may include the name
+ * in debugging output.
*
* <p>Subclasses may override this method.
*
- * @since 14.0 (present in 10.0 as getServiceName)
+ * @since 10.0
*/
- protected String serviceName() {
+ protected String getServiceName() {
return getClass().getSimpleName();
}
}
diff --git a/guava/src/com/google/common/util/concurrent/AbstractFuture.java b/guava/src/com/google/common/util/concurrent/AbstractFuture.java
index e14a111..bef3d3d 100644
--- a/guava/src/com/google/common/util/concurrent/AbstractFuture.java
+++ b/guava/src/com/google/common/util/concurrent/AbstractFuture.java
@@ -70,11 +70,6 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
// The execution list to hold our executors.
private final ExecutionList executionList = new ExecutionList();
- /**
- * Constructor for use by subclasses.
- */
- protected AbstractFuture() {}
-
/*
* Improve the documentation of when InterruptedException is thrown. Our
* behavior matches the JDK's, but the JDK's documentation is misleading.
@@ -128,7 +123,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- if (!sync.cancel(mayInterruptIfRunning)) {
+ if (!sync.cancel()) {
return false;
}
executionList.execute();
@@ -151,16 +146,6 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
}
/**
- * Returns true if this future was cancelled with {@code
- * mayInterruptIfRunning} set to {@code true}.
- *
- * @since 14.0
- */
- protected final boolean wasInterrupted() {
- return sync.wasInterrupted();
- }
-
- /**
* {@inheritDoc}
*
* @since 10.0
@@ -216,14 +201,13 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
* private subclass to hold the synchronizer. This synchronizer is used to
* implement the blocking and waiting calls as well as to handle state changes
* in a thread-safe manner. The current state of the future is held in the
- * Sync state, and the lock is released whenever the state changes to
- * {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}
+ * Sync state, and the lock is released whenever the state changes to either
+ * {@link #COMPLETED} or {@link #CANCELLED}.
*
* <p>To avoid races between threads doing release and acquire, we transition
* to the final state in two steps. One thread will successfully CAS from
* RUNNING to COMPLETING, that thread will then set the result of the
- * computation, and only then transition to COMPLETED, CANCELLED, or
- * INTERRUPTED.
+ * computation, and only then transition to COMPLETED or CANCELLED.
*
* <p>We don't use the integer argument passed between acquire methods so we
* pass around a -1 everywhere.
@@ -237,7 +221,6 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
static final int COMPLETING = 1;
static final int COMPLETED = 2;
static final int CANCELLED = 4;
- static final int INTERRUPTED = 8;
private V value;
private Throwable exception;
@@ -309,9 +292,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
}
case CANCELLED:
- case INTERRUPTED:
- throw cancellationExceptionWithCause(
- "Task was cancelled.", exception);
+ throw new CancellationException("Task was cancelled.");
default:
throw new IllegalStateException(
@@ -320,25 +301,17 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
}
/**
- * Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link
- * INTERRUPTED}.
+ * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
*/
boolean isDone() {
- return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
+ return (getState() & (COMPLETED | CANCELLED)) != 0;
}
/**
- * Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}.
+ * Checks if the state is {@link #CANCELLED}.
*/
boolean isCancelled() {
- return (getState() & (CANCELLED | INTERRUPTED)) != 0;
- }
-
- /**
- * Checks if the state is {@link #INTERRUPTED}.
- */
- boolean wasInterrupted() {
- return getState() == INTERRUPTED;
+ return getState() == CANCELLED;
}
/**
@@ -356,10 +329,10 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
}
/**
- * Transition to the CANCELLED or INTERRUPTED state.
+ * Transition to the CANCELLED state.
*/
- boolean cancel(boolean interrupt) {
- return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
+ boolean cancel() {
+ return complete(null, null, CANCELLED);
}
/**
@@ -367,8 +340,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
* be set but not both. The {@code finalState} is the state to change to
* from {@link #RUNNING}. If the state is not in the RUNNING state we
* return {@code false} after waiting for the state to be set to a valid
- * final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link
- * #INTERRUPTED}).
+ * final state ({@link #COMPLETED} or {@link #CANCELLED}).
*
* @param v the value to set as the result of the computation.
* @param t the exception to set as the result of the computation.
@@ -381,9 +353,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
// If this thread successfully transitioned to COMPLETING, set the value
// and exception and then release to the final state.
this.value = v;
- // Don't actually construct a CancellationException until necessary.
- this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
- ? new CancellationException("Future.cancel() was called.") : t;
+ this.exception = t;
releaseShared(finalState);
} else if (getState() == COMPLETING) {
// If some other thread is currently completing the future, block until
@@ -393,11 +363,4 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> {
return doCompletion;
}
}
-
- static final CancellationException cancellationExceptionWithCause(
- @Nullable String message, @Nullable Throwable cause) {
- CancellationException exception = new CancellationException(message);
- exception.initCause(cause);
- return exception;
- }
}
diff --git a/guava/src/com/google/common/util/concurrent/AbstractIdleService.java b/guava/src/com/google/common/util/concurrent/AbstractIdleService.java
index 96a6ff3..504a6bc 100644
--- a/guava/src/com/google/common/util/concurrent/AbstractIdleService.java
+++ b/guava/src/com/google/common/util/concurrent/AbstractIdleService.java
@@ -37,7 +37,7 @@ public abstract class AbstractIdleService implements Service {
/* use AbstractService for state management */
private final Service delegate = new AbstractService() {
@Override protected final void doStart() {
- executor().execute(new Runnable() {
+ executor(State.STARTING).execute(new Runnable() {
@Override public void run() {
try {
startUp();
@@ -51,7 +51,7 @@ public abstract class AbstractIdleService implements Service {
}
@Override protected final void doStop() {
- executor().execute(new Runnable() {
+ executor(State.STOPPING).execute(new Runnable() {
@Override public void run() {
try {
shutDown();
@@ -65,9 +65,6 @@ public abstract class AbstractIdleService implements Service {
}
};
- /** Constructor for use by subclasses. */
- protected AbstractIdleService() {}
-
/** Start the service. */
protected abstract void startUp() throws Exception;
@@ -81,19 +78,22 @@ public abstract class AbstractIdleService implements Service {
* priority. The returned executor's {@link Executor#execute(Runnable)
* execute()} method is called when this service is started and stopped,
* and should return promptly.
+ *
+ * @param state {@link Service.State#STARTING} or
+ * {@link Service.State#STOPPING}, used by the default implementation for
+ * naming the thread
*/
- protected Executor executor() {
- final State state = state();
+ protected Executor executor(final State state) {
return new Executor() {
@Override
public void execute(Runnable command) {
- MoreExecutors.newThread(serviceName() + " " + state, command).start();
+ new Thread(command, getServiceName() + " " + state).start();
}
};
}
@Override public String toString() {
- return serviceName() + " [" + state() + "]";
+ return getServiceName() + " [" + state() + "]";
}
// We override instead of using ForwardingService so that these can be final.
@@ -122,27 +122,7 @@ public abstract class AbstractIdleService implements Service {
return delegate.stopAndWait();
}
- /**
- * @since 13.0
- */
- @Override public final void addListener(Listener listener, Executor executor) {
- delegate.addListener(listener, executor);
- }
-
- /**
- * @since 14.0
- */
- @Override public final Throwable failureCause() {
- return delegate.failureCause();
- }
-
- /**
- * Returns the name of this service. {@link AbstractIdleService} may include the name in debugging
- * output.
- *
- * @since 14.0
- */
- protected String serviceName() {
+ private String getServiceName() {
return getClass().getSimpleName();
}
}
diff --git a/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java b/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java
index c3e33a5..24f596f 100644
--- a/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java
+++ b/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java
@@ -14,9 +14,7 @@
package com.google.common.util.concurrent;
-import static com.google.common.util.concurrent.MoreExecutors.invokeAnyImpl;
-
-import com.google.common.annotations.Beta;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.ArrayList;
import java.util.Collection;
@@ -25,12 +23,11 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.Nullable;
-
/**
* Implements {@link ListeningExecutorService} execution methods atop the abstract {@link #execute}
* method. More concretely, the {@code submit}, {@code invokeAny} and {@code invokeAll} methods
@@ -40,17 +37,15 @@ import javax.annotation.Nullable;
* termination.
*
* @author Doug Lea
- * @since 14.0
*/
-@Beta
-public abstract class AbstractListeningExecutorService implements ListeningExecutorService {
+abstract class AbstractListeningExecutorService implements ListeningExecutorService {
@Override public ListenableFuture<?> submit(Runnable task) {
ListenableFutureTask<Void> ftask = ListenableFutureTask.create(task, null);
execute(ftask);
return ftask;
}
- @Override public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) {
+ @Override public <T> ListenableFuture<T> submit(Runnable task, T result) {
ListenableFutureTask<T> ftask = ListenableFutureTask.create(task, result);
execute(ftask);
return ftask;
@@ -62,10 +57,82 @@ public abstract class AbstractListeningExecutorService implements ListeningExecu
return ftask;
}
+ /**
+ * the main mechanics of invokeAny.
+ */
+ private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ int ntasks = tasks.size();
+ checkArgument(ntasks > 0);
+ List<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
+ ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
+
+ // For efficiency, especially in executors with limited
+ // parallelism, check to see if previously submitted tasks are
+ // done before submitting more of them. This interleaving
+ // plus the exception mechanics account for messiness of main
+ // loop.
+
+ try {
+ // Record exceptions so that if we fail to obtain any
+ // result, we can throw the last exception we got.
+ ExecutionException ee = null;
+ long lastTime = timed ? System.nanoTime() : 0;
+ Iterator<? extends Callable<T>> it = tasks.iterator();
+
+ // Start one task for sure; the rest incrementally
+ futures.add(ecs.submit(it.next()));
+ --ntasks;
+ int active = 1;
+
+ for (;;) {
+ Future<T> f = ecs.poll();
+ if (f == null) {
+ if (ntasks > 0) {
+ --ntasks;
+ futures.add(ecs.submit(it.next()));
+ ++active;
+ } else if (active == 0) {
+ break;
+ } else if (timed) {
+ f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
+ if (f == null) {
+ throw new TimeoutException();
+ }
+ long now = System.nanoTime();
+ nanos -= now - lastTime;
+ lastTime = now;
+ } else {
+ f = ecs.take();
+ }
+ }
+ if (f != null) {
+ --active;
+ try {
+ return f.get();
+ } catch (ExecutionException eex) {
+ ee = eex;
+ } catch (RuntimeException rex) {
+ ee = new ExecutionException(rex);
+ }
+ }
+ }
+
+ if (ee == null) {
+ ee = new ExecutionException(null);
+ }
+ throw ee;
+ } finally {
+ for (Future<T> f : futures) {
+ f.cancel(true);
+ }
+ }
+ }
+
@Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
- return invokeAnyImpl(this, tasks, false, 0);
+ return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
throw new AssertionError();
}
@@ -74,7 +141,7 @@ public abstract class AbstractListeningExecutorService implements ListeningExecu
@Override public <T> T invokeAny(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout));
+ return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
diff --git a/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java b/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java
index 949f76a..f847205 100644
--- a/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java
+++ b/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java
@@ -21,11 +21,9 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
@@ -98,8 +96,9 @@ public abstract class AbstractScheduledService implements Service {
*
* <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory
* methods, these provide {@link Scheduler} instances for the common use case of running the
- * service with a fixed schedule. If more flexibility is needed then consider subclassing
- * {@link CustomScheduler}.
+ * service with a fixed schedule. If more flexibility is needed then consider subclassing the
+ * {@link CustomScheduler} abstract class in preference to creating your own {@link Scheduler}
+ * implementation.
*
* @author Luke Sandberg
* @since 11.0
@@ -230,9 +229,6 @@ public abstract class AbstractScheduledService implements Service {
}
};
- /** Constructor for use by subclasses. */
- protected AbstractScheduledService() {}
-
/**
* Run one iteration of the scheduled task. If any invocation of this method throws an exception,
* the service will transition to the {@link Service.State#FAILED} state and this method will no
@@ -240,19 +236,11 @@ public abstract class AbstractScheduledService implements Service {
*/
protected abstract void runOneIteration() throws Exception;
- /**
- * Start the service.
- *
- * <p>By default this method does nothing.
- */
- protected void startUp() throws Exception {}
+ /** Start the service. */
+ protected abstract void startUp() throws Exception;
- /**
- * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}.
- *
- * <p>By default this method does nothing.
- */
- protected void shutDown() throws Exception {}
+ /** Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. */
+ protected abstract void shutDown() throws Exception;
/**
* Returns the {@link Scheduler} object used to configure this service. This method will only be
@@ -262,56 +250,19 @@ public abstract class AbstractScheduledService implements Service {
/**
* Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
- * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the
- * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this
- * service {@linkplain Service.State#TERMINATED terminates} or
- * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a
- * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called
- * once.
+ * {@link #runOneIteration} and {@link #shutDown} methods. The executor will not be
+ * {@link ScheduledExecutorService#shutdown} when this service stops. Subclasses may override this
+ * method to use a custom {@link ScheduledExecutorService} instance.
*
* <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
- * pool that sets the name of the thread to the {@linkplain #serviceName() service name}.
- * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the
- * service {@linkplain Service.State#TERMINATED terminates} or
- * {@linkplain Service.State#TERMINATED fails}.
+ * pool. This method will only be called once.
*/
protected ScheduledExecutorService executor() {
- final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactory() {
- @Override public Thread newThread(Runnable runnable) {
- return MoreExecutors.newThread(serviceName(), runnable);
- }
- });
- // Add a listener to shutdown the executor after the service is stopped. This ensures that the
- // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
- // Technically this listener is added after start() was called so it is a little gross, but it
- // is called within doStart() so we know that the service cannot terminate or fail concurrently
- // with adding this listener so it is impossible to miss an event that we are interested in.
- addListener(new Listener() {
- @Override public void starting() {}
- @Override public void running() {}
- @Override public void stopping(State from) {}
- @Override public void terminated(State from) {
- executor.shutdown();
- }
- @Override public void failed(State from, Throwable failure) {
- executor.shutdown();
- }}, MoreExecutors.sameThreadExecutor());
- return executor;
+ return Executors.newSingleThreadScheduledExecutor();
}
- /**
- * Returns the name of this service. {@link AbstractScheduledService} may include the name in
- * debugging output.
- *
- * @since 14.0
- */
- protected String serviceName() {
- return getClass().getSimpleName();
- }
-
@Override public String toString() {
- return serviceName() + " [" + state() + "]";
+ return getClass().getSimpleName() + " [" + state() + "]";
}
// We override instead of using ForwardingService so that these can be final.
@@ -341,20 +292,6 @@ public abstract class AbstractScheduledService implements Service {
}
/**
- * @since 13.0
- */
- @Override public final void addListener(Listener listener, Executor executor) {
- delegate.addListener(listener, executor);
- }
-
- /**
- * @since 14.0
- */
- @Override public final Throwable failureCause() {
- return delegate.failureCause();
- }
-
- /**
* A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to
* use a dynamically changing schedule. After every execution of the task, assuming it hasn't
* been cancelled, the {@link #getNextSchedule} method will be called.
diff --git a/guava/src/com/google/common/util/concurrent/AbstractService.java b/guava/src/com/google/common/util/concurrent/AbstractService.java
index f028a59..f84b374 100644
--- a/guava/src/com/google/common/util/concurrent/AbstractService.java
+++ b/guava/src/com/google/common/util/concurrent/AbstractService.java
@@ -16,148 +16,68 @@
package com.google.common.util.concurrent;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.Beta;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Service.State; // javadoc needs this
-import java.util.List;
-import java.util.Queue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.Immutable;
/**
- * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
- * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
- * callbacks. Its subclasses must manage threads manually; consider
- * {@link AbstractExecutionThreadService} if you need only a single execution thread.
+ * Base class for implementing services that can handle {@link #doStart} and
+ * {@link #doStop} requests, responding to them with {@link #notifyStarted()}
+ * and {@link #notifyStopped()} callbacks. Its subclasses must manage threads
+ * manually; consider {@link AbstractExecutionThreadService} if you need only a
+ * single execution thread.
*
* @author Jesse Wilson
- * @author Luke Sandberg
* @since 1.0
*/
@Beta
public abstract class AbstractService implements Service {
- private static final Logger logger = Logger.getLogger(AbstractService.class.getName());
+
private final ReentrantLock lock = new ReentrantLock();
private final Transition startup = new Transition();
private final Transition shutdown = new Transition();
/**
- * The listeners to notify during a state transition.
- */
- @GuardedBy("lock")
- private final List<ListenerExecutorPair> listeners = Lists.newArrayList();
-
- /**
- * The queue of listeners that are waiting to be executed.
- *
- * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be
- * protected by the implicit lock on this object. Dequeue operations should be executed atomically
- * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be
- * held when the listeners are being executed. Use {@link #executeListeners} for this operation.
- * This is necessary to ensure that elements on the queue are executed in the correct order.
- * Enqueue operations should be protected so that listeners are added in the correct order. We use
- * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues.
+ * The internal state, which equals external state unless
+ * shutdownWhenStartupFinishes is true. Guarded by {@code lock}.
*/
- @GuardedBy("queuedListeners")
- private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
-
+ private State state = State.NEW;
+
/**
- * The current state of the service. This should be written with the lock held but can be read
- * without it because it is an immutable object in a volatile field. This is desirable so that
- * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
- * without grabbing the lock.
- *
- * <p>To update this field correctly the lock must be held to guarantee that the state is
- * consistent.
+ * If true, the user requested a shutdown while the service was still starting
+ * up. Guarded by {@code lock}.
*/
- @GuardedBy("lock")
- private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW);
+ private boolean shutdownWhenStartupFinishes = false;
- /** Constructor for use by subclasses. */
- protected AbstractService() {
- // Add a listener to update the futures. This needs to be added first so that it is executed
- // before the other listeners. This way the other listeners can access the completed futures.
- addListener(
- new Listener() {
- @Override public void starting() {}
-
- @Override public void running() {
- startup.set(State.RUNNING);
- }
-
- @Override public void stopping(State from) {
- if (from == State.STARTING) {
- startup.set(State.STOPPING);
- }
- }
-
- @Override public void terminated(State from) {
- if (from == State.NEW) {
- startup.set(State.TERMINATED);
- }
- shutdown.set(State.TERMINATED);
- }
-
- @Override public void failed(State from, Throwable failure) {
- switch (from) {
- case STARTING:
- startup.setException(failure);
- shutdown.setException(new Exception("Service failed to start.", failure));
- break;
- case RUNNING:
- shutdown.setException(new Exception("Service failed while running", failure));
- break;
- case STOPPING:
- shutdown.setException(failure);
- break;
- case TERMINATED: /* fall-through */
- case FAILED: /* fall-through */
- case NEW: /* fall-through */
- default:
- throw new AssertionError("Unexpected from state: " + from);
- }
- }
- },
- MoreExecutors.sameThreadExecutor());
- }
-
/**
- * This method is called by {@link #start} to initiate service startup. The invocation of this
- * method should cause a call to {@link #notifyStarted()}, either during this method's run, or
- * after it has returned. If startup fails, the invocation should cause a call to
- * {@link #notifyFailed(Throwable)} instead.
+ * This method is called by {@link #start} to initiate service startup. The
+ * invocation of this method should cause a call to {@link #notifyStarted()},
+ * either during this method's run, or after it has returned. If startup
+ * fails, the invocation should cause a call to {@link
+ * #notifyFailed(Throwable)} instead.
*
- * <p>This method should return promptly; prefer to do work on a different thread where it is
- * convenient. It is invoked exactly once on service startup, even when {@link #start} is called
- * multiple times.
+ * <p>This method should return promptly; prefer to do work on a different
+ * thread where it is convenient. It is invoked exactly once on service
+ * startup, even when {@link #start} is called multiple times.
*/
protected abstract void doStart();
/**
- * This method should be used to initiate service shutdown. The invocation of this method should
- * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
- * returned. If shutdown fails, the invocation should cause a call to
- * {@link #notifyFailed(Throwable)} instead.
+ * This method should be used to initiate service shutdown. The invocation
+ * of this method should cause a call to {@link #notifyStopped()}, either
+ * during this method's run, or after it has returned. If shutdown fails, the
+ * invocation should cause a call to {@link #notifyFailed(Throwable)} instead.
*
- * <p> This method should return promptly; prefer to do work on a different thread where it is
- * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called
- * multiple times.
+ * <p>This method should return promptly; prefer to do work on a different
+ * thread where it is convenient. It is invoked exactly once on service
+ * shutdown, even when {@link #stop} is called multiple times.
*/
protected abstract void doStop();
@@ -165,16 +85,15 @@ public abstract class AbstractService implements Service {
public final ListenableFuture<State> start() {
lock.lock();
try {
- if (snapshot.state == State.NEW) {
- snapshot = new StateSnapshot(State.STARTING);
- starting();
+ if (state == State.NEW) {
+ state = State.STARTING;
doStart();
}
} catch (Throwable startupFailure) {
+ // put the exception in the future, the user can get it via Future.get()
notifyFailed(startupFailure);
} finally {
lock.unlock();
- executeListeners();
}
return startup;
@@ -184,33 +103,22 @@ public abstract class AbstractService implements Service {
public final ListenableFuture<State> stop() {
lock.lock();
try {
- switch (snapshot.state) {
- case NEW:
- snapshot = new StateSnapshot(State.TERMINATED);
- terminated(State.NEW);
- break;
- case STARTING:
- snapshot = new StateSnapshot(State.STARTING, true, null);
- stopping(State.STARTING);
- break;
- case RUNNING:
- snapshot = new StateSnapshot(State.STOPPING);
- stopping(State.RUNNING);
- doStop();
- break;
- case STOPPING:
- case TERMINATED:
- case FAILED:
- // do nothing
- break;
- default:
- throw new AssertionError("Unexpected state: " + snapshot.state);
+ if (state == State.NEW) {
+ state = State.TERMINATED;
+ startup.set(State.TERMINATED);
+ shutdown.set(State.TERMINATED);
+ } else if (state == State.STARTING) {
+ shutdownWhenStartupFinishes = true;
+ startup.set(State.STOPPING);
+ } else if (state == State.RUNNING) {
+ state = State.STOPPING;
+ doStop();
}
} catch (Throwable shutdownFailure) {
+ // put the exception in the future, the user can get it via Future.get()
notifyFailed(shutdownFailure);
} finally {
lock.unlock();
- executeListeners();
}
return shutdown;
@@ -227,91 +135,84 @@ public abstract class AbstractService implements Service {
}
/**
- * Implementing classes should invoke this method once their service has started. It will cause
- * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
+ * Implementing classes should invoke this method once their service has
+ * started. It will cause the service to transition from {@link
+ * State#STARTING} to {@link State#RUNNING}.
*
- * @throws IllegalStateException if the service is not {@link State#STARTING}.
+ * @throws IllegalStateException if the service is not
+ * {@link State#STARTING}.
*/
protected final void notifyStarted() {
lock.lock();
try {
- if (snapshot.state != State.STARTING) {
+ if (state != State.STARTING) {
IllegalStateException failure = new IllegalStateException(
- "Cannot notifyStarted() when the service is " + snapshot.state);
+ "Cannot notifyStarted() when the service is " + state);
notifyFailed(failure);
throw failure;
}
- if (snapshot.shutdownWhenStartupFinishes) {
- snapshot = new StateSnapshot(State.STOPPING);
- // We don't call listeners here because we already did that when we set the
- // shutdownWhenStartupFinishes flag.
- doStop();
+ state = State.RUNNING;
+ if (shutdownWhenStartupFinishes) {
+ stop();
} else {
- snapshot = new StateSnapshot(State.RUNNING);
- running();
+ startup.set(State.RUNNING);
}
} finally {
lock.unlock();
- executeListeners();
}
}
/**
- * Implementing classes should invoke this method once their service has stopped. It will cause
- * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
+ * Implementing classes should invoke this method once their service has
+ * stopped. It will cause the service to transition from {@link
+ * State#STOPPING} to {@link State#TERMINATED}.
*
- * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
- * {@link State#RUNNING}.
+ * @throws IllegalStateException if the service is neither {@link
+ * State#STOPPING} nor {@link State#RUNNING}.
*/
protected final void notifyStopped() {
lock.lock();
try {
- if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) {
+ if (state != State.STOPPING && state != State.RUNNING) {
IllegalStateException failure = new IllegalStateException(
- "Cannot notifyStopped() when the service is " + snapshot.state);
+ "Cannot notifyStopped() when the service is " + state);
notifyFailed(failure);
throw failure;
}
- State previous = snapshot.state;
- snapshot = new StateSnapshot(State.TERMINATED);
- terminated(previous);
+
+ state = State.TERMINATED;
+ shutdown.set(State.TERMINATED);
} finally {
lock.unlock();
- executeListeners();
}
}
/**
- * Invoke this method to transition the service to the {@link State#FAILED}. The service will
- * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
- * or otherwise cannot be started nor stopped.
+ * Invoke this method to transition the service to the
+ * {@link State#FAILED}. The service will <b>not be stopped</b> if it
+ * is running. Invoke this method when a service has failed critically or
+ * otherwise cannot be started nor stopped.
*/
protected final void notifyFailed(Throwable cause) {
checkNotNull(cause);
lock.lock();
try {
- switch (snapshot.state) {
- case NEW:
- case TERMINATED:
- throw new IllegalStateException("Failed while in state:" + snapshot.state, cause);
- case RUNNING:
- case STARTING:
- case STOPPING:
- State previous = snapshot.state;
- snapshot = new StateSnapshot(State.FAILED, false, cause);
- failed(previous, cause);
- break;
- case FAILED:
- // Do nothing
- break;
- default:
- throw new AssertionError("Unexpected state: " + snapshot.state);
+ if (state == State.STARTING) {
+ startup.setException(cause);
+ shutdown.setException(new Exception(
+ "Service failed to start.", cause));
+ } else if (state == State.STOPPING) {
+ shutdown.setException(cause);
+ } else if (state == State.RUNNING) {
+ shutdown.setException(new Exception("Service failed while running", cause));
+ } else if (state == State.NEW || state == State.TERMINATED) {
+ throw new IllegalStateException("Failed while in state:" + state, cause);
}
+ state = State.FAILED;
} finally {
lock.unlock();
- executeListeners();
}
}
@@ -322,28 +223,12 @@ public abstract class AbstractService implements Service {
@Override
public final State state() {
- return snapshot.externalState();
- }
-
- /**
- * @since 14.0
- */
- @Override
- public final Throwable failureCause() {
- return snapshot.failureCause();
- }
-
- /**
- * @since 13.0
- */
- @Override
- public final void addListener(Listener listener, Executor executor) {
- checkNotNull(listener, "listener");
- checkNotNull(executor, "executor");
lock.lock();
try {
- if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) {
- listeners.add(new ListenerExecutorPair(listener, executor));
+ if (shutdownWhenStartupFinishes && state == State.STARTING) {
+ return State.STOPPING;
+ } else {
+ return state;
}
} finally {
lock.unlock();
@@ -368,182 +253,4 @@ public abstract class AbstractService implements Service {
}
}
}
-
- /**
- * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the
- * {@link #lock}.
- */
- private void executeListeners() {
- if (!lock.isHeldByCurrentThread()) {
- synchronized (queuedListeners) {
- Runnable listener;
- while ((listener = queuedListeners.poll()) != null) {
- listener.run();
- }
- }
- }
- }
-
- @GuardedBy("lock")
- private void starting() {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.starting();
- }
- });
- }
- });
- }
- }
-
- @GuardedBy("lock")
- private void running() {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.running();
- }
- });
- }
- });
- }
- }
-
- @GuardedBy("lock")
- private void stopping(final State from) {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.stopping(from);
- }
- });
- }
- });
- }
- }
-
- @GuardedBy("lock")
- private void terminated(final State from) {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.terminated(from);
- }
- });
- }
- });
- }
- // There are no more state transitions so we can clear this out.
- listeners.clear();
- }
-
- @GuardedBy("lock")
- private void failed(final State from, final Throwable cause) {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.failed(from, cause);
- }
- });
- }
- });
- }
- // There are no more state transitions so we can clear this out.
- listeners.clear();
- }
-
- /** A simple holder for a listener and its executor. */
- private static class ListenerExecutorPair {
- final Listener listener;
- final Executor executor;
-
- ListenerExecutorPair(Listener listener, Executor executor) {
- this.listener = listener;
- this.executor = executor;
- }
-
- /**
- * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all
- * exceptions
- */
- void execute(Runnable runnable) {
- try {
- executor.execute(runnable);
- } catch (Exception e) {
- logger.log(Level.SEVERE, "Exception while executing listener " + listener
- + " with executor " + executor, e);
- }
- }
- }
-
- /**
- * An immutable snapshot of the current state of the service. This class represents a consistent
- * snapshot of the state and therefore it can be used to answer simple queries without needing to
- * grab a lock.
- */
- @Immutable
- private static final class StateSnapshot {
- /**
- * The internal state, which equals external state unless
- * shutdownWhenStartupFinishes is true.
- */
- final State state;
-
- /**
- * If true, the user requested a shutdown while the service was still starting
- * up.
- */
- final boolean shutdownWhenStartupFinishes;
-
- /**
- * The exception that caused this service to fail. This will be {@code null}
- * unless the service has failed.
- */
- @Nullable
- final Throwable failure;
-
- StateSnapshot(State internalState) {
- this(internalState, false, null);
- }
-
- StateSnapshot(
- State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
- checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING,
- "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
- internalState);
- checkArgument(!(failure != null ^ internalState == State.FAILED),
- "A failure cause should be set if and only if the state is failed. Got %s and %s "
- + "instead.", internalState, failure);
- this.state = internalState;
- this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
- this.failure = failure;
- }
-
- /** @see Service#state() */
- State externalState() {
- if (shutdownWhenStartupFinishes && state == State.STARTING) {
- return State.STOPPING;
- } else {
- return state;
- }
- }
-
- /** @see Service#failureCause() */
- Throwable failureCause() {
- checkState(state == State.FAILED,
- "failureCause() is only valid if the service has failed, service is %s", state);
- return failure;
- }
- }
}
diff --git a/guava/src/com/google/common/util/concurrent/AsyncFunction.java b/guava/src/com/google/common/util/concurrent/AsyncFunction.java
index cdb1228..441c029 100644
--- a/guava/src/com/google/common/util/concurrent/AsyncFunction.java
+++ b/guava/src/com/google/common/util/concurrent/AsyncFunction.java
@@ -16,6 +16,8 @@
package com.google.common.util.concurrent;
+import com.google.common.annotations.Beta;
+
import java.util.concurrent.Future;
/**
@@ -25,6 +27,7 @@ import java.util.concurrent.Future;
* @author Chris Povirk
* @since 11.0
*/
+@Beta
public interface AsyncFunction<I, O> {
/**
* Returns an output {@code Future} to use in place of the given {@code
diff --git a/guava/src/com/google/common/util/concurrent/AtomicDouble.java b/guava/src/com/google/common/util/concurrent/AtomicDouble.java
index d007f45..0f38fb9 100644
--- a/guava/src/com/google/common/util/concurrent/AtomicDouble.java
+++ b/guava/src/com/google/common/util/concurrent/AtomicDouble.java
@@ -14,9 +14,10 @@
package com.google.common.util.concurrent;
+import com.google.common.annotations.Beta;
+
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.longBitsToDouble;
-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
@@ -42,16 +43,17 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
*
* <p>It is possible to write a more scalable updater, at the cost of
* giving up strict atomicity. See for example
- * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleAdder.html">
- * DoubleAdder</a>
+ * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleAdder.html"
+ * DoubleAdder>
* and
- * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleMaxUpdater.html">
- * DoubleMaxUpdater</a>.
+ * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleMaxUpdater.html"
+ * DoubleMaxUpdater>.
*
* @author Doug Lea
* @author Martin Buchholz
* @since 11.0
*/
+@Beta
public class AtomicDouble extends Number implements java.io.Serializable {
private static final long serialVersionUID = 0L;
diff --git a/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java b/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java
index 407cd7c..37f4c5c 100644
--- a/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java
+++ b/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java
@@ -13,9 +13,10 @@
package com.google.common.util.concurrent;
+import com.google.common.annotations.Beta;
+
import static java.lang.Double.doubleToRawLongBits;
import static java.lang.Double.longBitsToDouble;
-
import java.util.concurrent.atomic.AtomicLongArray;
/**
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLongArray;
* @author Martin Buchholz
* @since 11.0
*/
+@Beta
public class AtomicDoubleArray implements java.io.Serializable {
private static final long serialVersionUID = 0L;
diff --git a/guava/src/com/google/common/util/concurrent/AtomicLongMap.java b/guava/src/com/google/common/util/concurrent/AtomicLongMap.java
index d0af965..c49f84c 100644
--- a/guava/src/com/google/common/util/concurrent/AtomicLongMap.java
+++ b/guava/src/com/google/common/util/concurrent/AtomicLongMap.java
@@ -1,24 +1,10 @@
-/*
- * Copyright (C) 2011 The Guava Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+// Copyright 2011 Google Inc. All Rights Reserved.
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.annotations.GwtCompatible;
+import com.google.common.annotations.Beta;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
@@ -50,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
* @author Charles Fry
* @since 11.0
*/
-@GwtCompatible
+@Beta
public final class AtomicLongMap<K> {
private final ConcurrentHashMap<K, AtomicLong> map;
diff --git a/guava/src/com/google/common/util/concurrent/Atomics.java b/guava/src/com/google/common/util/concurrent/Atomics.java
index efb7946..fece83d 100644
--- a/guava/src/com/google/common/util/concurrent/Atomics.java
+++ b/guava/src/com/google/common/util/concurrent/Atomics.java
@@ -16,6 +16,8 @@
package com.google.common.util.concurrent;
+import com.google.common.annotations.Beta;
+
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -28,6 +30,7 @@ import javax.annotation.Nullable;
* @author Kurt Alfred Kluever
* @since 10.0
*/
+@Beta
public final class Atomics {
private Atomics() {}
diff --git a/guava/src/com/google/common/util/concurrent/CycleDetectingLockFactory.java b/guava/src/com/google/common/util/concurrent/CycleDetectingLockFactory.java
deleted file mode 100644
index 528fc8e..0000000
--- a/guava/src/com/google/common/util/concurrent/CycleDetectingLockFactory.java
+++ /dev/null
@@ -1,1038 +0,0 @@
-/*
- * Copyright (C) 2011 The Guava Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.common.util.concurrent;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapMaker;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * The {@code CycleDetectingLockFactory} creates {@link ReentrantLock} instances and
- * {@link ReentrantReadWriteLock} instances that detect potential deadlock by checking
- * for cycles in lock acquisition order.
- * <p>
- * Potential deadlocks detected when calling the {@code lock()},
- * {@code lockInterruptibly()}, or {@code tryLock()} methods will result in the
- * execution of the {@link Policy} specified when creating the factory. The
- * currently available policies are:
- * <ul>
- * <li>DISABLED
- * <li>WARN
- * <li>THROW
- * </ul>
- * The locks created by a factory instance will detect lock acquisition cycles
- * with locks created by other {@code CycleDetectingLockFactory} instances
- * (except those with {@code Policy.DISABLED}). A lock's behavior when a cycle
- * is detected, however, is defined by the {@code Policy} of the factory that
- * created it. This allows detection of cycles across components while
- * delegating control over lock behavior to individual components.
- * <p>
- * Applications are encouraged to use a {@code CycleDetectingLockFactory} to
- * create any locks for which external/unmanaged code is executed while the lock
- * is held. (See caveats under <strong>Performance</strong>).
- * <p>
- * <strong>Cycle Detection</strong>
- * <p>
- * Deadlocks can arise when locks are acquired in an order that forms a cycle.
- * In a simple example involving two locks and two threads, deadlock occurs
- * when one thread acquires Lock A, and then Lock B, while another thread
- * acquires Lock B, and then Lock A:
- * <pre>
- * Thread1: acquire(LockA) --X acquire(LockB)
- * Thread2: acquire(LockB) --X acquire(LockA)
- * </pre>
- * Neither thread will progress because each is waiting for the other. In more
- * complex applications, cycles can arise from interactions among more than 2
- * locks:
- * <pre>
- * Thread1: acquire(LockA) --X acquire(LockB)
- * Thread2: acquire(LockB) --X acquire(LockC)
- * ...
- * ThreadN: acquire(LockN) --X acquire(LockA)
- * </pre>
- * The implementation detects cycles by constructing a directed graph in which
- * each lock represents a node and each edge represents an acquisition ordering
- * between two locks.
- * <ul>
- * <li>Each lock adds (and removes) itself to/from a ThreadLocal Set of acquired
- * locks when the Thread acquires its first hold (and releases its last
- * remaining hold).
- * <li>Before the lock is acquired, the lock is checked against the current set
- * of acquired locks---to each of the acquired locks, an edge from the
- * soon-to-be-acquired lock is either verified or created.
- * <li>If a new edge needs to be created, the outgoing edges of the acquired
- * locks are traversed to check for a cycle that reaches the lock to be
- * acquired. If no cycle is detected, a new "safe" edge is created.
- * <li>If a cycle is detected, an "unsafe" (cyclic) edge is created to represent
- * a potential deadlock situation, and the appropriate Policy is executed.
- * </ul>
- * Note that detection of potential deadlock does not necessarily indicate that
- * deadlock will happen, as it is possible that higher level application logic
- * prevents the cyclic lock acquisition from occurring. One example of a false
- * positive is:
- * <pre>
- * LockA -&gt; LockB -&gt; LockC
- * LockA -&gt; LockC -&gt; LockB
- * </pre>
- *
- * <strong>ReadWriteLocks</strong>
- * <p>
- * While {@code ReadWriteLock} instances have different properties and can form cycles
- * without potential deadlock, this class treats {@code ReadWriteLock} instances as
- * equivalent to traditional exclusive locks. Although this increases the false
- * positives that the locks detect (i.e. cycles that will not actually result in
- * deadlock), it simplifies the algorithm and implementation considerably. The
- * assumption is that a user of this factory wishes to eliminate any cyclic
- * acquisition ordering.
- * <p>
- * <strong>Explicit Lock Acquisition Ordering</strong>
- * <p>
- * The {@link CycleDetectingLockFactory.WithExplicitOrdering} class can be used
- * to enforce an application-specific ordering in addition to performing general
- * cycle detection.
- * <p>
- * <strong>Garbage Collection</strong>
- * <p>
- * In order to allow proper garbage collection of unused locks, the edges of
- * the lock graph are weak references.
- * <p>
- * <strong>Performance</strong>
- * <p>
- * The extra bookkeeping done by cycle detecting locks comes at some cost to
- * performance. Benchmarks (as of December 2011) show that:
- *
- * <ul>
- * <li>for an unnested {@code lock()} and {@code unlock()}, a cycle detecting
- * lock takes 38ns as opposed to the 24ns taken by a plain lock.
- * <li>for nested locking, the cost increases with the depth of the nesting:
- * <ul>
- * <li> 2 levels: average of 64ns per lock()/unlock()
- * <li> 3 levels: average of 77ns per lock()/unlock()
- * <li> 4 levels: average of 99ns per lock()/unlock()
- * <li> 5 levels: average of 103ns per lock()/unlock()
- * <li>10 levels: average of 184ns per lock()/unlock()
- * <li>20 levels: average of 393ns per lock()/unlock()
- * </ul>
- * </ul>
- *
- * As such, the CycleDetectingLockFactory may not be suitable for
- * performance-critical applications which involve tightly-looped or
- * deeply-nested locking algorithms.
- *
- * @author Darick Tong
- * @since 13.0
- */
-@Beta
-@ThreadSafe
-public class CycleDetectingLockFactory {
-
- /**
- * Encapsulates the action to be taken when a potential deadlock is
- * encountered. Clients can use one of the predefined {@link Policies} or
- * specify a custom implementation. Implementations must be thread-safe.
- *
- * @since 13.0
- */
- @Beta
- @ThreadSafe
- public interface Policy {
-
- /**
- * Called when a potential deadlock is encountered. Implementations can
- * throw the given {@code exception} and/or execute other desired logic.
- * <p>
- * Note that the method will be called even upon an invocation of
- * {@code tryLock()}. Although {@code tryLock()} technically recovers from
- * deadlock by eventually timing out, this behavior is chosen based on the
- * assumption that it is the application's wish to prohibit any cyclical
- * lock acquisitions.
- */
- void handlePotentialDeadlock(PotentialDeadlockException exception);
- }
-
- /**
- * Pre-defined {@link Policy} implementations.
- *
- * @since 13.0
- */
- @Beta
- public enum Policies implements Policy {
- /**
- * When potential deadlock is detected, this policy results in the throwing
- * of the {@code PotentialDeadlockException} indicating the potential
- * deadlock, which includes stack traces illustrating the cycle in lock
- * acquisition order.
- */
- THROW {
- @Override
- public void handlePotentialDeadlock(PotentialDeadlockException e) {
- throw e;
- }
- },
-
- /**
- * When potential deadlock is detected, this policy results in the logging
- * of a {@link Level#SEVERE} message indicating the potential deadlock,
- * which includes stack traces illustrating the cycle in lock acquisition
- * order.
- */
- WARN {
- @Override
- public void handlePotentialDeadlock(PotentialDeadlockException e) {
- logger.log(Level.SEVERE, "Detected potential deadlock", e);
- }
- },
-
- /**
- * Disables cycle detection. This option causes the factory to return
- * unmodified lock implementations provided by the JDK, and is provided to
- * allow applications to easily parameterize when cycle detection is
- * enabled.
- * <p>
- * Note that locks created by a factory with this policy will <em>not</em>
- * participate the cycle detection performed by locks created by other
- * factories.
- */
- DISABLED {
- @Override
- public void handlePotentialDeadlock(PotentialDeadlockException e) {
- }
- };
- }
-
- /**
- * Creates a new factory with the specified policy.
- */
- public static CycleDetectingLockFactory newInstance(Policy policy) {
- return new CycleDetectingLockFactory(policy);
- }
-
- /**
- * Equivalent to {@code newReentrantLock(lockName, false)}.
- */
- public ReentrantLock newReentrantLock(String lockName) {
- return newReentrantLock(lockName, false);
- }
-
- /**
- * Creates a {@link ReentrantLock} with the given fairness policy. The
- * {@code lockName} is used in the warning or exception output to help
- * identify the locks involved in the detected deadlock.
- */
- public ReentrantLock newReentrantLock(String lockName, boolean fair) {
- return policy == Policies.DISABLED ? new ReentrantLock(fair)
- : new CycleDetectingReentrantLock(
- new LockGraphNode(lockName), fair);
- }
-
- /**
- * Equivalent to {@code newReentrantReadWriteLock(lockName, false)}.
- */
- public ReentrantReadWriteLock newReentrantReadWriteLock(String lockName) {
- return newReentrantReadWriteLock(lockName, false);
- }
-
- /**
- * Creates a {@link ReentrantReadWriteLock} with the given fairness policy.
- * The {@code lockName} is used in the warning or exception output to help
- * identify the locks involved in the detected deadlock.
- */
- public ReentrantReadWriteLock newReentrantReadWriteLock(
- String lockName, boolean fair) {
- return policy == Policies.DISABLED ? new ReentrantReadWriteLock(fair)
- : new CycleDetectingReentrantReadWriteLock(
- new LockGraphNode(lockName), fair);
- }
-
- // A static mapping from an Enum type to its set of LockGraphNodes.
- private static final Map<Class<? extends Enum>,
- Map<? extends Enum, LockGraphNode>> lockGraphNodesPerType =
- new MapMaker().weakKeys().makeComputingMap(
- new OrderedLockGraphNodesCreator());
-
- /**
- * Creates a {@code CycleDetectingLockFactory.WithExplicitOrdering<E>}.
- */
- public static <E extends Enum<E>> WithExplicitOrdering<E>
- newInstanceWithExplicitOrdering(Class<E> enumClass, Policy policy) {
- // OrderedLockGraphNodesCreator maps each enumClass to a Map with the
- // corresponding enum key type.
- checkNotNull(enumClass);
- checkNotNull(policy);
- @SuppressWarnings("unchecked")
- Map<E, LockGraphNode> lockGraphNodes =
- (Map<E, LockGraphNode>) lockGraphNodesPerType.get(enumClass);
- return new WithExplicitOrdering<E>(policy, lockGraphNodes);
- }
-
- /**
- * A {@code CycleDetectingLockFactory.WithExplicitOrdering} provides the
- * additional enforcement of an application-specified ordering of lock
- * acquisitions. The application defines the allowed ordering with an
- * {@code Enum} whose values each correspond to a lock type. The order in
- * which the values are declared dictates the allowed order of lock
- * acquisition. In other words, locks corresponding to smaller values of
- * {@link Enum#ordinal()} should only be acquired before locks with larger
- * ordinals. Example:
- *
- * <pre> {@code
- * enum MyLockOrder {
- * FIRST, SECOND, THIRD;
- * }
- *
- * CycleDetectingLockFactory.WithExplicitOrdering<MyLockOrder> factory =
- * CycleDetectingLockFactory.newInstanceWithExplicitOrdering(Policies.THROW);
- *
- * Lock lock1 = factory.newReentrantLock(MyLockOrder.FIRST);
- * Lock lock2 = factory.newReentrantLock(MyLockOrder.SECOND);
- * Lock lock3 = factory.newReentrantLock(MyLockOrder.THIRD);
- *
- * lock1.lock();
- * lock3.lock();
- * lock2.lock(); // will throw an IllegalStateException
- * }</pre>
- *
- * As with all locks created by instances of {@code CycleDetectingLockFactory}
- * explicitly ordered locks participate in general cycle detection with all
- * other cycle detecting locks, and a lock's behavior when detecting a cyclic
- * lock acquisition is defined by the {@code Policy} of the factory that
- * created it.
- * <p>
- * Note, however, that although multiple locks can be created for a given Enum
- * value, whether it be through separate factory instances or through multiple
- * calls to the same factory, attempting to acquire multiple locks with the
- * same Enum value (within the same thread) will result in an
- * IllegalStateException regardless of the factory's policy. For example:
- *
- * <pre> {@code
- * CycleDetectingLockFactory.WithExplicitOrdering<MyLockOrder> factory1 =
- * CycleDetectingLockFactory.newInstanceWithExplicitOrdering(...);
- * CycleDetectingLockFactory.WithExplicitOrdering<MyLockOrder> factory2 =
- * CycleDetectingLockFactory.newInstanceWithExplicitOrdering(...);
- *
- * Lock lockA = factory1.newReentrantLock(MyLockOrder.FIRST);
- * Lock lockB = factory1.newReentrantLock(MyLockOrder.FIRST);
- * Lock lockC = factory2.newReentrantLock(MyLockOrder.FIRST);
- *
- * lockA.lock();
- *
- * lockB.lock(); // will throw an IllegalStateException
- * lockC.lock(); // will throw an IllegalStateException
- *
- * lockA.lock(); // reentrant acquisition is okay
- * }</pre>
- *
- * It is the responsibility of the application to ensure that multiple lock
- * instances with the same rank are never acquired in the same thread.
- *
- * @param <E> The Enum type representing the explicit lock ordering.
- * @since 13.0
- */
- @Beta
- public static final class WithExplicitOrdering<E extends Enum<E>>
- extends CycleDetectingLockFactory {
-
- private final Map<E, LockGraphNode> lockGraphNodes;
-
- @VisibleForTesting
- WithExplicitOrdering(
- Policy policy, Map<E, LockGraphNode> lockGraphNodes) {
- super(policy);
- this.lockGraphNodes = lockGraphNodes;
- }
-
- /**
- * Equivalent to {@code newReentrantLock(rank, false)}.
- */
- public ReentrantLock newReentrantLock(E rank) {
- return newReentrantLock(rank, false);
- }
-
- /**
- * Creates a {@link ReentrantLock} with the given fairness policy and rank.
- * The values returned by {@link Enum#getDeclaringClass()} and
- * {@link Enum#name()} are used to describe the lock in warning or
- * exception output.
- *
- * @throws IllegalStateException If the factory has already created a
- * {@code Lock} with the specified rank.
- */
- public ReentrantLock newReentrantLock(E rank, boolean fair) {
- return policy == Policies.DISABLED ? new ReentrantLock(fair)
- : new CycleDetectingReentrantLock(lockGraphNodes.get(rank), fair);
- }
-
- /**
- * Equivalent to {@code newReentrantReadWriteLock(rank, false)}.
- */
- public ReentrantReadWriteLock newReentrantReadWriteLock(E rank) {
- return newReentrantReadWriteLock(rank, false);
- }
-
- /**
- * Creates a {@link ReentrantReadWriteLock} with the given fairness policy
- * and rank. The values returned by {@link Enum#getDeclaringClass()} and
- * {@link Enum#name()} are used to describe the lock in warning or exception
- * output.
- *
- * @throws IllegalStateException If the factory has already created a
- * {@code Lock} with the specified rank.
- */
- public ReentrantReadWriteLock newReentrantReadWriteLock(
- E rank, boolean fair) {
- return policy == Policies.DISABLED ? new ReentrantReadWriteLock(fair)
- : new CycleDetectingReentrantReadWriteLock(
- lockGraphNodes.get(rank), fair);
- }
- }
-
- /**
- * For a given Enum type, creates an immutable map from each of the Enum's
- * values to a corresponding LockGraphNode, with the
- * {@code allowedPriorLocks} and {@code disallowedPriorLocks} prepopulated
- * with nodes according to the natural ordering of the associated Enum values.
- */
- @VisibleForTesting
- static class OrderedLockGraphNodesCreator
- implements Function<Class<? extends Enum>,
- Map<? extends Enum, LockGraphNode>> {
-
- @Override
- @SuppressWarnings("unchecked") // There's no way to properly express with
- // wildcards the recursive Enum type required by createNodesFor(), and the
- // Map/Function types must use wildcards since they accept any Enum class.
- public Map<? extends Enum, LockGraphNode> apply(
- Class<? extends Enum> clazz) {
- return createNodesFor(clazz);
- }
-
- <E extends Enum<E>> Map<E, LockGraphNode> createNodesFor(Class<E> clazz) {
- EnumMap<E, LockGraphNode> map = Maps.newEnumMap(clazz);
- E[] keys = clazz.getEnumConstants();
- final int numKeys = keys.length;
- ArrayList<LockGraphNode> nodes =
- Lists.newArrayListWithCapacity(numKeys);
- // Create a LockGraphNode for each enum value.
- for (E key : keys) {
- LockGraphNode node = new LockGraphNode(getLockName(key));
- nodes.add(node);
- map.put(key, node);
- }
- // Pre-populate all allowedPriorLocks with nodes of smaller ordinal.
- for (int i = 1; i < numKeys; i++) {
- nodes.get(i).checkAcquiredLocks(Policies.THROW, nodes.subList(0, i));
- }
- // Pre-populate all disallowedPriorLocks with nodes of larger ordinal.
- for (int i = 0; i < numKeys - 1; i++) {
- nodes.get(i).checkAcquiredLocks(
- Policies.DISABLED, nodes.subList(i + 1, numKeys));
- }
- return Collections.unmodifiableMap(map);
- }
-
- /**
- * For the given Enum value {@code rank}, returns the value's
- * {@code "EnumClass.name"}, which is used in exception and warning
- * output.
- */
- private String getLockName(Enum<?> rank) {
- return rank.getDeclaringClass().getSimpleName() + "." + rank.name();
- }
- }
-
- //////// Implementation /////////
-
- private static final Logger logger = Logger.getLogger(
- CycleDetectingLockFactory.class.getName());
-
- final Policy policy;
-
- private CycleDetectingLockFactory(Policy policy) {
- this.policy = checkNotNull(policy);
- }
-
- /**
- * Tracks the currently acquired locks for each Thread, kept up to date by
- * calls to {@link #aboutToAcquire(CycleDetectingLock)} and
- * {@link #lockStateChanged(CycleDetectingLock)}.
- */
- // This is logically a Set, but an ArrayList is used to minimize the amount
- // of allocation done on lock()/unlock().
- private static final ThreadLocal<ArrayList<LockGraphNode>>
- acquiredLocks = new ThreadLocal<ArrayList<LockGraphNode>>() {
- @Override
- protected ArrayList<LockGraphNode> initialValue() {
- return Lists.<LockGraphNode>newArrayListWithCapacity(3);
- }
- };
-
- /**
- * A Throwable used to record a stack trace that illustrates an example of
- * a specific lock acquisition ordering. The top of the stack trace is
- * truncated such that it starts with the acquisition of the lock in
- * question, e.g.
- *
- * <pre>
- * com...ExampleStackTrace: LockB -&gt; LockC
- * at com...CycleDetectingReentrantLock.lock(CycleDetectingLockFactory.java:443)
- * at ...
- * at ...
- * at com...MyClass.someMethodThatAcquiresLockB(MyClass.java:123)
- * </pre>
- */
- private static class ExampleStackTrace extends IllegalStateException {
-
- static final StackTraceElement[] EMPTY_STACK_TRACE =
- new StackTraceElement[0];
-
- static Set<String> EXCLUDED_CLASS_NAMES = ImmutableSet.of(
- CycleDetectingLockFactory.class.getName(),
- ExampleStackTrace.class.getName(),
- LockGraphNode.class.getName());
-
- ExampleStackTrace(LockGraphNode node1, LockGraphNode node2) {
- super(node1.getLockName() + " -> " + node2.getLockName());
- StackTraceElement[] origStackTrace = getStackTrace();
- for (int i = 0, n = origStackTrace.length; i < n; i++) {
- if (WithExplicitOrdering.class.getName().equals(
- origStackTrace[i].getClassName())) {
- // For pre-populated disallowedPriorLocks edges, omit the stack trace.
- setStackTrace(EMPTY_STACK_TRACE);
- break;
- }
- if (!EXCLUDED_CLASS_NAMES.contains(origStackTrace[i].getClassName())) {
- setStackTrace(Arrays.copyOfRange(origStackTrace, i, n));
- break;
- }
- }
- }
- }
-
- /**
- * Represents a detected cycle in lock acquisition ordering. The exception
- * includes a causal chain of {@code ExampleStackTrace} instances to illustrate the
- * cycle, e.g.
- *
- * <pre>
- * com....PotentialDeadlockException: Potential Deadlock from LockC -&gt; ReadWriteA
- * at ...
- * at ...
- * Caused by: com...ExampleStackTrace: LockB -&gt; LockC
- * at ...
- * at ...
- * Caused by: com...ExampleStackTrace: ReadWriteA -&gt; LockB
- * at ...
- * at ...
- * </pre>
- *
- * Instances are logged for the {@code Policies.WARN}, and thrown for
- * {@code Policies.THROW}.
- *
- * @since 13.0
- */
- @Beta
- public static final class PotentialDeadlockException
- extends ExampleStackTrace {
-
- private final ExampleStackTrace conflictingStackTrace;
-
- private PotentialDeadlockException(
- LockGraphNode node1,
- LockGraphNode node2,
- ExampleStackTrace conflictingStackTrace) {
- super(node1, node2);
- this.conflictingStackTrace = conflictingStackTrace;
- initCause(conflictingStackTrace);
- }
-
- public ExampleStackTrace getConflictingStackTrace() {
- return conflictingStackTrace;
- }
-
- /**
- * Appends the chain of messages from the {@code conflictingStackTrace} to
- * the original {@code message}.
- */
- @Override
- public String getMessage() {
- StringBuilder message = new StringBuilder(super.getMessage());
- for (Throwable t = conflictingStackTrace; t != null; t = t.getCause()) {
- message.append(", ").append(t.getMessage());
- }
- return message.toString();
- }
- }
-
- /**
- * Internal Lock implementations implement the {@code CycleDetectingLock}
- * interface, allowing the detection logic to treat all locks in the same
- * manner.
- */
- private interface CycleDetectingLock {
-
- /** @return the {@link LockGraphNode} associated with this lock. */
- LockGraphNode getLockGraphNode();
-
- /** @return {@code true} if the current thread has acquired this lock. */
- boolean isAcquiredByCurrentThread();
- }
-
- /**
- * A {@code LockGraphNode} associated with each lock instance keeps track of
- * the directed edges in the lock acquisition graph.
- */
- private static class LockGraphNode {
-
- /**
- * The map tracking the locks that are known to be acquired before this
- * lock, each associated with an example stack trace. Locks are weakly keyed
- * to allow proper garbage collection when they are no longer referenced.
- */
- final Map<LockGraphNode, ExampleStackTrace> allowedPriorLocks =
- new MapMaker().weakKeys().makeMap();
-
- /**
- * The map tracking lock nodes that can cause a lock acquisition cycle if
- * acquired before this node.
- */
- final Map<LockGraphNode, PotentialDeadlockException>
- disallowedPriorLocks = new MapMaker().weakKeys().makeMap();
-
- final String lockName;
-
- LockGraphNode(String lockName) {
- this.lockName = Preconditions.checkNotNull(lockName);
- }
-
- String getLockName() {
- return lockName;
- }
-
- void checkAcquiredLocks(
- Policy policy, List<LockGraphNode> acquiredLocks) {
- for (int i = 0, size = acquiredLocks.size(); i < size; i++) {
- checkAcquiredLock(policy, acquiredLocks.get(i));
- }
- }
-
- /**
- * Checks the acquisition-ordering between {@code this}, which is about to
- * be acquired, and the specified {@code acquiredLock}.
- * <p>
- * When this method returns, the {@code acquiredLock} should be in either
- * the {@code preAcquireLocks} map, for the case in which it is safe to
- * acquire {@code this} after the {@code acquiredLock}, or in the
- * {@code disallowedPriorLocks} map, in which case it is not safe.
- */
- void checkAcquiredLock(Policy policy, LockGraphNode acquiredLock) {
- // checkAcquiredLock() should never be invoked by a lock that has already
- // been acquired. For unordered locks, aboutToAcquire() ensures this by
- // checking isAcquiredByCurrentThread(). For ordered locks, however, this
- // can happen because multiple locks may share the same LockGraphNode. In
- // this situation, throw an IllegalStateException as defined by contract
- // described in the documentation of WithExplicitOrdering.
- Preconditions.checkState(
- this != acquiredLock,
- "Attempted to acquire multiple locks with the same rank " +
- acquiredLock.getLockName());
-
- if (allowedPriorLocks.containsKey(acquiredLock)) {
- // The acquisition ordering from "acquiredLock" to "this" has already
- // been verified as safe. In a properly written application, this is
- // the common case.
- return;
- }
- PotentialDeadlockException previousDeadlockException =
- disallowedPriorLocks.get(acquiredLock);
- if (previousDeadlockException != null) {
- // Previously determined to be an unsafe lock acquisition.
- // Create a new PotentialDeadlockException with the same causal chain
- // (the example cycle) as that of the cached exception.
- PotentialDeadlockException exception = new PotentialDeadlockException(
- acquiredLock, this,
- previousDeadlockException.getConflictingStackTrace());
- policy.handlePotentialDeadlock(exception);
- return;
- }
- // Otherwise, it's the first time seeing this lock relationship. Look for
- // a path from the acquiredLock to this.
- Set<LockGraphNode> seen = Sets.newIdentityHashSet();
- ExampleStackTrace path = acquiredLock.findPathTo(this, seen);
-
- if (path == null) {
- // this can be safely acquired after the acquiredLock.
- //
- // Note that there is a race condition here which can result in missing
- // a cyclic edge: it's possible for two threads to simultaneous find
- // "safe" edges which together form a cycle. Preventing this race
- // condition efficiently without _introducing_ deadlock is probably
- // tricky. For now, just accept the race condition---missing a warning
- // now and then is still better than having no deadlock detection.
- allowedPriorLocks.put(
- acquiredLock, new ExampleStackTrace(acquiredLock, this));
- } else {
- // Unsafe acquisition order detected. Create and cache a
- // PotentialDeadlockException.
- PotentialDeadlockException exception =
- new PotentialDeadlockException(acquiredLock, this, path);
- disallowedPriorLocks.put(acquiredLock, exception);
- policy.handlePotentialDeadlock(exception);
- }
- }
-
- /**
- * Performs a depth-first traversal of the graph edges defined by each
- * node's {@code allowedPriorLocks} to find a path between {@code this} and
- * the specified {@code lock}.
- *
- * @return If a path was found, a chained {@link ExampleStackTrace}
- * illustrating the path to the {@code lock}, or {@code null} if no path
- * was found.
- */
- @Nullable
- private ExampleStackTrace findPathTo(
- LockGraphNode node, Set<LockGraphNode> seen) {
- if (!seen.add(this)) {
- return null; // Already traversed this node.
- }
- ExampleStackTrace found = allowedPriorLocks.get(node);
- if (found != null) {
- return found; // Found a path ending at the node!
- }
- // Recurse the edges.
- for (Map.Entry<LockGraphNode, ExampleStackTrace> entry :
- allowedPriorLocks.entrySet()) {
- LockGraphNode preAcquiredLock = entry.getKey();
- found = preAcquiredLock.findPathTo(node, seen);
- if (found != null) {
- // One of this node's allowedPriorLocks found a path. Prepend an
- // ExampleStackTrace(preAcquiredLock, this) to the returned chain of
- // ExampleStackTraces.
- ExampleStackTrace path =
- new ExampleStackTrace(preAcquiredLock, this);
- path.setStackTrace(entry.getValue().getStackTrace());
- path.initCause(found);
- return path;
- }
- }
- return null;
- }
- }
-
- /**
- * CycleDetectingLock implementations must call this method before attempting
- * to acquire the lock.
- */
- private void aboutToAcquire(CycleDetectingLock lock) {
- if (!lock.isAcquiredByCurrentThread()) {
- ArrayList<LockGraphNode> acquiredLockList = acquiredLocks.get();
- LockGraphNode node = lock.getLockGraphNode();
- node.checkAcquiredLocks(policy, acquiredLockList);
- acquiredLockList.add(node);
- }
- }
-
- /**
- * CycleDetectingLock implementations must call this method in a
- * {@code finally} clause after any attempt to change the lock state,
- * including both lock and unlock attempts. Failure to do so can result in
- * corrupting the acquireLocks set.
- */
- private void lockStateChanged(CycleDetectingLock lock) {
- if (!lock.isAcquiredByCurrentThread()) {
- ArrayList<LockGraphNode> acquiredLockList = acquiredLocks.get();
- LockGraphNode node = lock.getLockGraphNode();
- // Iterate in reverse because locks are usually locked/unlocked in a
- // LIFO order.
- for (int i = acquiredLockList.size() - 1; i >=0; i--) {
- if (acquiredLockList.get(i) == node) {
- acquiredLockList.remove(i);
- break;
- }
- }
- }
- }
-
- final class CycleDetectingReentrantLock
- extends ReentrantLock implements CycleDetectingLock {
-
- private final LockGraphNode lockGraphNode;
-
- private CycleDetectingReentrantLock(
- LockGraphNode lockGraphNode, boolean fair) {
- super(fair);
- this.lockGraphNode = Preconditions.checkNotNull(lockGraphNode);
- }
-
- ///// CycleDetectingLock methods. /////
-
- @Override
- public LockGraphNode getLockGraphNode() {
- return lockGraphNode;
- }
-
- @Override
- public boolean isAcquiredByCurrentThread() {
- return isHeldByCurrentThread();
- }
-
- ///// Overridden ReentrantLock methods. /////
-
- @Override
- public void lock() {
- aboutToAcquire(this);
- try {
- super.lock();
- } finally {
- lockStateChanged(this);
- }
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- aboutToAcquire(this);
- try {
- super.lockInterruptibly();
- } finally {
- lockStateChanged(this);
- }
- }
-
- @Override
- public boolean tryLock() {
- aboutToAcquire(this);
- try {
- return super.tryLock();
- } finally {
- lockStateChanged(this);
- }
- }
-
- @Override
- public boolean tryLock(long timeout, TimeUnit unit)
- throws InterruptedException {
- aboutToAcquire(this);
- try {
- return super.tryLock(timeout, unit);
- } finally {
- lockStateChanged(this);
- }
- }
-
- @Override
- public void unlock() {
- try {
- super.unlock();
- } finally {
- lockStateChanged(this);
- }
- }
- }
-
- final class CycleDetectingReentrantReadWriteLock
- extends ReentrantReadWriteLock implements CycleDetectingLock {
-
- // These ReadLock/WriteLock implementations shadow those in the
- // ReentrantReadWriteLock superclass. They are simply wrappers around the
- // internal Sync object, so this is safe since the shadowed locks are never
- // exposed or used.
- private final CycleDetectingReentrantReadLock readLock;
- private final CycleDetectingReentrantWriteLock writeLock;
-
- private final LockGraphNode lockGraphNode;
-
- private CycleDetectingReentrantReadWriteLock(
- LockGraphNode lockGraphNode, boolean fair) {
- super(fair);
- this.readLock = new CycleDetectingReentrantReadLock(this);
- this.writeLock = new CycleDetectingReentrantWriteLock(this);
- this.lockGraphNode = Preconditions.checkNotNull(lockGraphNode);
- }
-
- ///// Overridden ReentrantReadWriteLock methods. /////
-
- @Override
- public ReadLock readLock() {
- return readLock;
- }
-
- @Override
- public WriteLock writeLock() {
- return writeLock;
- }
-
- ///// CycleDetectingLock methods. /////
-
- @Override
- public LockGraphNode getLockGraphNode() {
- return lockGraphNode;
- }
-
- @Override
- public boolean isAcquiredByCurrentThread() {
- return isWriteLockedByCurrentThread() || getReadHoldCount() > 0;
- }
- }
-
- private class CycleDetectingReentrantReadLock
- extends ReentrantReadWriteLock.ReadLock {
-
- final CycleDetectingReentrantReadWriteLock readWriteLock;
-
- CycleDetectingReentrantReadLock(
- CycleDetectingReentrantReadWriteLock readWriteLock) {
- super(readWriteLock);
- this.readWriteLock = readWriteLock;
- }
-
- @Override
- public void lock() {
- aboutToAcquire(readWriteLock);
- try {
- super.lock();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- aboutToAcquire(readWriteLock);
- try {
- super.lockInterruptibly();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public boolean tryLock() {
- aboutToAcquire(readWriteLock);
- try {
- return super.tryLock();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public boolean tryLock(long timeout, TimeUnit unit)
- throws InterruptedException {
- aboutToAcquire(readWriteLock);
- try {
- return super.tryLock(timeout, unit);
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public void unlock() {
- try {
- super.unlock();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
- }
-
- private class CycleDetectingReentrantWriteLock
- extends ReentrantReadWriteLock.WriteLock {
-
- final CycleDetectingReentrantReadWriteLock readWriteLock;
-
- CycleDetectingReentrantWriteLock(
- CycleDetectingReentrantReadWriteLock readWriteLock) {
- super(readWriteLock);
- this.readWriteLock = readWriteLock;
- }
-
- @Override
- public void lock() {
- aboutToAcquire(readWriteLock);
- try {
- super.lock();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- aboutToAcquire(readWriteLock);
- try {
- super.lockInterruptibly();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public boolean tryLock() {
- aboutToAcquire(readWriteLock);
- try {
- return super.tryLock();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public boolean tryLock(long timeout, TimeUnit unit)
- throws InterruptedException {
- aboutToAcquire(readWriteLock);
- try {
- return super.tryLock(timeout, unit);
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
-
- @Override
- public void unlock() {
- try {
- super.unlock();
- } finally {
- lockStateChanged(readWriteLock);
- }
- }
- }
-}
diff --git a/guava/src/com/google/common/util/concurrent/ExecutionError.java b/guava/src/com/google/common/util/concurrent/ExecutionError.java
index e462969..ce588eb 100644
--- a/guava/src/com/google/common/util/concurrent/ExecutionError.java
+++ b/guava/src/com/google/common/util/concurrent/ExecutionError.java
@@ -17,8 +17,7 @@
package com.google.common.util.concurrent;
import com.google.common.annotations.GwtCompatible;
-
-import javax.annotation.Nullable;
+import com.google.common.annotations.Beta;
/**
* {@link Error} variant of {@link java.util.concurrent.ExecutionException}. As
@@ -32,6 +31,7 @@ import javax.annotation.Nullable;
* @author Chris Povirk
* @since 10.0
*/
+@Beta
@GwtCompatible
public class ExecutionError extends Error {
/**
@@ -42,21 +42,21 @@ public class ExecutionError extends Error {
/**
* Creates a new instance with the given detail message.
*/
- protected ExecutionError(@Nullable String message) {
+ protected ExecutionError(String message) {
super(message);
}
/**
* Creates a new instance with the given detail message and cause.
*/
- public ExecutionError(@Nullable String message, @Nullable Error cause) {
+ public ExecutionError(String message, Error cause) {
super(message, cause);
}
/**
* Creates a new instance with the given cause.
*/
- public ExecutionError(@Nullable Error cause) {
+ public ExecutionError(Error cause) {
super(cause);
}
diff --git a/guava/src/com/google/common/util/concurrent/ExecutionList.java b/guava/src/com/google/common/util/concurrent/ExecutionList.java
index e1a40d0..d1b78f5 100644
--- a/guava/src/com/google/common/util/concurrent/ExecutionList.java
+++ b/guava/src/com/google/common/util/concurrent/ExecutionList.java
@@ -16,7 +16,6 @@
package com.google.common.util.concurrent;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -46,7 +45,7 @@ import java.util.logging.Logger;
public final class ExecutionList {
// Logger to log exceptions caught when running runnables.
- @VisibleForTesting static final Logger log =
+ private static final Logger log =
Logger.getLogger(ExecutionList.class.getName());
// The runnable,executor pairs to execute.
diff --git a/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java b/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java
index 75a1e94..890479d 100644
--- a/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java
+++ b/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java
@@ -16,8 +16,6 @@
package com.google.common.util.concurrent;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.annotations.Beta;
import java.util.concurrent.Callable;
@@ -38,16 +36,12 @@ public final class FakeTimeLimiter implements TimeLimiter {
@Override
public <T> T newProxy(T target, Class<T> interfaceType, long timeoutDuration,
TimeUnit timeoutUnit) {
- checkNotNull(target);
- checkNotNull(interfaceType);
- checkNotNull(timeoutUnit);
return target; // ha ha
}
@Override
public <T> T callWithTimeout(Callable<T> callable, long timeoutDuration,
TimeUnit timeoutUnit, boolean amInterruptible) throws Exception {
- checkNotNull(timeoutUnit);
return callable.call(); // fooled you
}
}
diff --git a/guava/src/com/google/common/util/concurrent/ForwardingService.java b/guava/src/com/google/common/util/concurrent/ForwardingService.java
index 8774232..e39e4db 100644
--- a/guava/src/com/google/common/util/concurrent/ForwardingService.java
+++ b/guava/src/com/google/common/util/concurrent/ForwardingService.java
@@ -19,22 +19,13 @@ package com.google.common.util.concurrent;
import com.google.common.annotations.Beta;
import com.google.common.collect.ForwardingObject;
-import java.util.concurrent.Executor;
-
/**
* A {@link Service} that forwards all method calls to another service.
*
- * @deprecated Instead of using a {@link ForwardingService}, consider using the
- * {@link Service.Listener} functionality to hook into the {@link Service}
- * lifecycle, or if you really do need to provide access to some Service
- * methods, consider just providing the few that you actually need (e.g. just
- * {@link #startAndWait()}) and not implementing Service.
- *
* @author Chris Nokleberg
* @since 1.0
*/
@Beta
-@Deprecated
public abstract class ForwardingService extends ForwardingObject
implements Service {
@@ -66,20 +57,6 @@ public abstract class ForwardingService extends ForwardingObject
@Override public boolean isRunning() {
return delegate().isRunning();
}
-
- /**
- * @since 13.0
- */
- @Override public void addListener(Listener listener, Executor executor) {
- delegate().addListener(listener, executor);
- }
-
- /**
- * @since 14.0
- */
- @Override public Throwable failureCause() {
- return delegate().failureCause();
- }
/**
* A sensible default implementation of {@link #startAndWait()}, in terms of
diff --git a/guava/src/com/google/common/util/concurrent/FutureCallback.java b/guava/src/com/google/common/util/concurrent/FutureCallback.java
index 735d6ab..7b39d4a 100644
--- a/guava/src/com/google/common/util/concurrent/FutureCallback.java
+++ b/guava/src/com/google/common/util/concurrent/FutureCallback.java
@@ -16,6 +16,8 @@
package com.google.common.util.concurrent;
+import com.google.common.annotations.Beta;
+
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -28,6 +30,7 @@ import java.util.concurrent.Future;
* @author Anthony Zana
* @since 10.0
*/
+@Beta
public interface FutureCallback<V> {
/**
* Invoked with the result of the {@code Future} computation when it is
diff --git a/guava/src/com/google/common/util/concurrent/FutureFallback.java b/guava/src/com/google/common/util/concurrent/FutureFallback.java
deleted file mode 100644
index 7d03c67..0000000
--- a/guava/src/com/google/common/util/concurrent/FutureFallback.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2011 The Guava Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.common.util.concurrent;
-
-import com.google.common.annotations.Beta;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-/**
- * Provides a backup {@code Future} to replace an earlier failed {@code Future}.
- * An implementation of this interface can be applied to an input {@code Future}
- * with {@link Futures#withFallback}.
- *
- * @param <V> the result type of the provided backup {@code Future}
- *
- * @author Bruno Diniz
- * @since 14.0
- */
-@Beta
-public interface FutureFallback<V> {
- /**
- * Returns a {@code Future} to be used in place of the {@code Future} that
- * failed with the given exception. The exception is provided so that the
- * {@code Fallback} implementation can conditionally determine whether to
- * propagate the exception or to attempt to recover.
- *
- * @param t the exception that made the future fail. If the future's {@link
- * Future#get() get} method throws an {@link ExecutionException}, then the
- * cause is passed to this method. Any other thrown object is passed
- * unaltered.
- */
- ListenableFuture<V> create(Throwable t) throws Exception;
-}
diff --git a/guava/src/com/google/common/util/concurrent/Futures.java b/guava/src/com/google/common/util/concurrent/Futures.java
index aad6b43..dc703c5 100644
--- a/guava/src/com/google/common/util/concurrent/Futures.java
+++ b/guava/src/com/google/common/util/concurrent/Futures.java
@@ -21,14 +21,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
+import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
+import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
import static java.lang.Thread.currentThread;
import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@@ -38,27 +39,22 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Static utility methods pertaining to the {@link Future} interface.
*
- * <p>Many of these methods use the {@link ListenableFuture} API; consult the
- * Guava User Guide article on <a href=
- * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
- * {@code ListenableFuture}</a>.
- *
* @author Kevin Bourrillion
* @author Nishant Thakkar
* @author Sven Mawson
@@ -75,7 +71,7 @@ public final class Futures {
*
* <p>The given mapping function will be applied to an
* {@link InterruptedException}, a {@link CancellationException}, or an
- * {@link ExecutionException}.
+ * {@link ExecutionException} with the actual cause of the exception.
* See {@link Future#get()} for details on the exceptions thrown.
*
* @since 9.0 (source-compatible since 1.0)
@@ -85,151 +81,6 @@ public final class Futures {
return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
}
- private abstract static class ImmediateFuture<V>
- implements ListenableFuture<V> {
-
- private static final Logger log =
- Logger.getLogger(ImmediateFuture.class.getName());
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- checkNotNull(listener, "Runnable was null.");
- checkNotNull(executor, "Executor was null.");
- try {
- executor.execute(listener);
- } catch (RuntimeException e) {
- // ListenableFuture's contract is that it will not throw unchecked
- // exceptions, so log the bad runnable and/or executor and swallow it.
- log.log(Level.SEVERE, "RuntimeException while executing runnable "
- + listener + " with executor " + executor, e);
- }
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public abstract V get() throws ExecutionException;
-
- @Override
- public V get(long timeout, TimeUnit unit) throws ExecutionException {
- checkNotNull(unit);
- return get();
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
- }
-
- private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
-
- @Nullable private final V value;
-
- ImmediateSuccessfulFuture(@Nullable V value) {
- this.value = value;
- }
-
- @Override
- public V get() {
- return value;
- }
- }
-
- private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
- extends ImmediateFuture<V> implements CheckedFuture<V, X> {
-
- @Nullable private final V value;
-
- ImmediateSuccessfulCheckedFuture(@Nullable V value) {
- this.value = value;
- }
-
- @Override
- public V get() {
- return value;
- }
-
- @Override
- public V checkedGet() {
- return value;
- }
-
- @Override
- public V checkedGet(long timeout, TimeUnit unit) {
- checkNotNull(unit);
- return value;
- }
- }
-
- private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
-
- private final Throwable thrown;
-
- ImmediateFailedFuture(Throwable thrown) {
- this.thrown = thrown;
- }
-
- @Override
- public V get() throws ExecutionException {
- throw new ExecutionException(thrown);
- }
- }
-
- private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
-
- private final CancellationException thrown;
-
- ImmediateCancelledFuture() {
- this.thrown = new CancellationException("Immediate cancelled future.");
- }
-
- @Override
- public boolean isCancelled() {
- return true;
- }
-
- @Override
- public V get() {
- throw AbstractFuture.cancellationExceptionWithCause(
- "Task was cancelled.", thrown);
- }
- }
-
- private static class ImmediateFailedCheckedFuture<V, X extends Exception>
- extends ImmediateFuture<V> implements CheckedFuture<V, X> {
-
- private final X thrown;
-
- ImmediateFailedCheckedFuture(X thrown) {
- this.thrown = thrown;
- }
-
- @Override
- public V get() throws ExecutionException {
- throw new ExecutionException(thrown);
- }
-
- @Override
- public V checkedGet() throws X {
- throw thrown;
- }
-
- @Override
- public V checkedGet(long timeout, TimeUnit unit) throws X {
- checkNotNull(unit);
- throw thrown;
- }
- }
-
/**
* Creates a {@code ListenableFuture} which has its value set immediately upon
* construction. The getters just return the value. This {@code Future} can't
@@ -237,7 +88,9 @@ public final class Futures {
* {@code true}.
*/
public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
- return new ImmediateSuccessfulFuture<V>(value);
+ SettableFuture<V> future = SettableFuture.create();
+ future.set(value);
+ return future;
}
/**
@@ -250,7 +103,14 @@ public final class Futures {
*/
public static <V, X extends Exception> CheckedFuture<V, X>
immediateCheckedFuture(@Nullable V value) {
- return new ImmediateSuccessfulCheckedFuture<V, X>(value);
+ SettableFuture<V> future = SettableFuture.create();
+ future.set(value);
+ return Futures.makeChecked(future, new Function<Exception, X>() {
+ @Override
+ public X apply(Exception e) {
+ throw new AssertionError("impossible");
+ }
+ });
}
/**
@@ -261,21 +121,15 @@ public final class Futures {
* method always returns {@code true}. Calling {@code get()} will immediately
* throw the provided {@code Throwable} wrapped in an {@code
* ExecutionException}.
+ *
+ * @throws Error if the throwable is an {@link Error}.
*/
public static <V> ListenableFuture<V> immediateFailedFuture(
Throwable throwable) {
checkNotNull(throwable);
- return new ImmediateFailedFuture<V>(throwable);
- }
-
- /**
- * Creates a {@code ListenableFuture} which is cancelled immediately upon
- * construction, so that {@code isCancelled()} always returns {@code true}.
- *
- * @since 14.0
- */
- public static <V> ListenableFuture<V> immediateCancelledFuture() {
- return new ImmediateCancelledFuture<V>();
+ SettableFuture<V> future = SettableFuture.create();
+ future.setException(throwable);
+ return future;
}
/**
@@ -284,224 +138,149 @@ public final class Futures {
*
* <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
* method always returns {@code true}. Calling {@code get()} will immediately
- * throw the provided {@code Exception} wrapped in an {@code
+ * throw the provided {@code Throwable} wrapped in an {@code
* ExecutionException}, and calling {@code checkedGet()} will throw the
* provided exception itself.
+ *
+ * @throws Error if the throwable is an {@link Error}.
*/
public static <V, X extends Exception> CheckedFuture<V, X>
- immediateFailedCheckedFuture(X exception) {
+ immediateFailedCheckedFuture(final X exception) {
checkNotNull(exception);
- return new ImmediateFailedCheckedFuture<V, X>(exception);
+ return makeChecked(Futures.<V>immediateFailedFuture(exception),
+ new Function<Exception, X>() {
+ @Override
+ public X apply(Exception e) {
+ return exception;
+ }
+ });
}
/**
- * Returns a {@code Future} whose result is taken from the given primary
- * {@code input} or, if the primary input fails, from the {@code Future}
- * provided by the {@code fallback}. {@link FutureFallback#create} is not
- * invoked until the primary input has failed, so if the primary input
- * succeeds, it is never invoked. If, during the invocation of {@code
- * fallback}, an exception is thrown, this exception is used as the result of
- * the output {@code Future}.
- *
- * <p>Below is an example of a fallback that returns a default value if an
- * exception occurs:
- *
- * <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter in case an exception happens when
- * // processing the RPC to fetch counters.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * // Returning "0" as the default for the counter when the
- * // exception happens.
- * return immediateFuture(0);
- * }
- * });
- * }</pre>
- *
- * The fallback can also choose to propagate the original exception when
- * desired:
+ * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
+ * derived from the result of the given {@code Future}. More precisely, the
+ * returned {@code Future} takes its result from a {@code Future} produced by
+ * applying the given {@code Function} to the result of the original {@code
+ * Future}. Example:
*
* <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter only in case the exception was a
- * // TimeoutException.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * if (t instanceof TimeoutException) {
- * return immediateFuture(0);
- * }
- * return immediateFailedFuture(t);
+ * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
+ * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
+ * new Function<RowKey, ListenableFuture<QueryResult>>() {
+ * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
+ * return dataService.read(rowKey);
* }
- * });
+ * };
+ * ListenableFuture<QueryResult> queryFuture =
+ * chain(rowKeyFuture, queryFunction);
* }</pre>
*
- * Note: If the derived {@code Future} is slow or heavyweight to create
- * (whether the {@code Future} itself is slow or heavyweight to complete is
- * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
- * FutureFallback, Executor) supplying an executor}. If you do not supply an
- * executor, {@code withFallback} will use {@link
- * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
- * caveats for heavier operations. For example, the call to {@code
- * fallback.create} may run on an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code withFallback}
- * is called, {@code withFallback} will call {@code fallback.create} inline.
- * <li>If the input {@code Future} is not yet done, {@code withFallback} will
- * schedule {@code fallback.create} to be run by the thread that completes
- * the input {@code Future}, which may be an internal system thread such as
- * an RPC network thread.
- * </ul>
+ * <p>Note: This overload of {@code chain} is designed for cases in which the
+ * work of creating the derived future is fast and lightweight, as the method
+ * does not accept an {@code Executor} in which to perform the the work. For
+ * heavier derivations, this overload carries some caveats: First, the thread
+ * that the derivation runs in depends on whether the input {@code Future} is
+ * done at the time {@code chain} is called. In particular, if called late,
+ * {@code chain} will run the derivation in the thread that called {@code
+ * chain}. Second, derivations may run in an internal thread of the system
+ * responsible for the input {@code Future}, such as an RPC network thread.
+ * Finally, during the execution of a {@code sameThreadExecutor} {@code
+ * chain} function, all other registered but unexecuted listeners are
+ * prevented from running, even if those listeners are to run in other
+ * executors.
*
- * Also note that, regardless of which thread executes {@code
- * fallback.create}, all other registered but unexecuted listeners are
- * prevented from running during its execution, even if those listeners are
- * to run in other executors.
+ * <p>The returned {@code Future} attempts to keep its cancellation state in
+ * sync with that of the input future and that of the future returned by the
+ * chain function. That is, if the returned {@code Future} is cancelled, it
+ * will attempt to cancel the other two, and if either of the other two is
+ * cancelled, the returned {@code Future} will receive a callback in which it
+ * will attempt to cancel itself.
*
- * @param input the primary input {@code Future}
- * @param fallback the {@link FutureFallback} implementation to be called if
- * {@code input} fails
- * @since 14.0
+ * @param input The future to chain
+ * @param function A function to chain the results of the provided future
+ * to the results of the returned future. This will be run in the thread
+ * that notifies input it is complete.
+ * @return A future that holds result of the chain.
+ * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
+ * use {@link #transform(ListenableFuture, AsyncFunction)}. This method is
+ * scheduled to be removed from Guava in Guava release 12.0.
*/
- public static <V> ListenableFuture<V> withFallback(
- ListenableFuture<? extends V> input,
- FutureFallback<? extends V> fallback) {
- return withFallback(input, fallback, sameThreadExecutor());
+ @Deprecated
+ public static <I, O> ListenableFuture<O> chain(
+ ListenableFuture<I> input,
+ Function<? super I, ? extends ListenableFuture<? extends O>> function) {
+ return chain(input, function, MoreExecutors.sameThreadExecutor());
}
/**
- * Returns a {@code Future} whose result is taken from the given primary
- * {@code input} or, if the primary input fails, from the {@code Future}
- * provided by the {@code fallback}. {@link FutureFallback#create} is not
- * invoked until the primary input has failed, so if the primary input
- * succeeds, it is never invoked. If, during the invocation of {@code
- * fallback}, an exception is thrown, this exception is used as the result of
- * the output {@code Future}.
- *
- * <p>Below is an example of a fallback that returns a default value if an
- * exception occurs:
+ * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
+ * derived from the result of the given {@code Future}. More precisely, the
+ * returned {@code Future} takes its result from a {@code Future} produced by
+ * applying the given {@code Function} to the result of the original {@code
+ * Future}. Example:
*
* <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter in case an exception happens when
- * // processing the RPC to fetch counters.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * // Returning "0" as the default for the counter when the
- * // exception happens.
- * return immediateFuture(0);
+ * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
+ * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
+ * new Function<RowKey, ListenableFuture<QueryResult>>() {
+ * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
+ * return dataService.read(rowKey);
* }
- * }, sameThreadExecutor());
+ * };
+ * ListenableFuture<QueryResult> queryFuture =
+ * chain(rowKeyFuture, queryFunction, executor);
* }</pre>
*
- * The fallback can also choose to propagate the original exception when
- * desired:
- *
- * <pre> {@code
- * ListenableFuture<Integer> fetchCounterFuture = ...;
- *
- * // Falling back to a zero counter only in case the exception was a
- * // TimeoutException.
- * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
- * fetchCounterFuture, new FutureFallback<Integer>() {
- * public ListenableFuture<Integer> create(Throwable t) {
- * if (t instanceof TimeoutException) {
- * return immediateFuture(0);
- * }
- * return immediateFailedFuture(t);
- * }
- * }, sameThreadExecutor());
- * }</pre>
+ * <p>The returned {@code Future} attempts to keep its cancellation state in
+ * sync with that of the input future and that of the future returned by the
+ * chain function. That is, if the returned {@code Future} is cancelled, it
+ * will attempt to cancel the other two, and if either of the other two is
+ * cancelled, the returned {@code Future} will receive a callback in which it
+ * will attempt to cancel itself.
*
- * When the execution of {@code fallback.create} is fast and lightweight
- * (though the {@code Future} it returns need not meet these criteria),
- * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
- * omitting the executor} or explicitly specifying {@code
- * sameThreadExecutor}. However, be aware of the caveats documented in the
- * link above.
- *
- * @param input the primary input {@code Future}
- * @param fallback the {@link FutureFallback} implementation to be called if
- * {@code input} fails
- * @param executor the executor that runs {@code fallback} if {@code input}
- * fails
- * @since 14.0
- */
- public static <V> ListenableFuture<V> withFallback(
- ListenableFuture<? extends V> input,
- FutureFallback<? extends V> fallback, Executor executor) {
- checkNotNull(fallback);
- return new FallbackFuture<V>(input, fallback, executor);
- }
-
- /**
- * A future that falls back on a second, generated future, in case its
- * original future fails.
+ * <p>Note: For cases in which the work of creating the derived future is
+ * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture,
+ * Function) the other overload} or explicit use of {@code
+ * sameThreadExecutor}. For heavier derivations, this choice carries some
+ * caveats: First, the thread that the derivation runs in depends on whether
+ * the input {@code Future} is done at the time {@code chain} is called. In
+ * particular, if called late, {@code chain} will run the derivation in the
+ * thread that called {@code chain}. Second, derivations may run in an
+ * internal thread of the system responsible for the input {@code Future},
+ * such as an RPC network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} {@code chain} function, all other registered but
+ * unexecuted listeners are prevented from running, even if those listeners
+ * are to run in other executors.
+ *
+ * @param input The future to chain
+ * @param function A function to chain the results of the provided future
+ * to the results of the returned future.
+ * @param executor Executor to run the function in.
+ * @return A future that holds result of the chain.
+ * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
+ * use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This
+ * method is scheduled to be removed from Guava in Guava release 12.0.
*/
- private static class FallbackFuture<V> extends AbstractFuture<V> {
-
- private volatile ListenableFuture<? extends V> running;
-
- FallbackFuture(ListenableFuture<? extends V> input,
- final FutureFallback<? extends V> fallback,
- final Executor executor) {
- running = input;
- addCallback(running, new FutureCallback<V>() {
- @Override
- public void onSuccess(V value) {
- set(value);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (isCancelled()) {
- return;
- }
- try {
- running = fallback.create(t);
- if (isCancelled()) { // in case cancel called in the meantime
- running.cancel(wasInterrupted());
- return;
- }
- addCallback(running, new FutureCallback<V>() {
- @Override
- public void onSuccess(V value) {
- set(value);
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (running.isCancelled()) {
- cancel(false);
- } else {
- setException(t);
- }
- }
- }, sameThreadExecutor());
- } catch (Exception e) {
- setException(e);
- } catch (Error e) {
- setException(e); // note: rethrows
+ @Deprecated
+ public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
+ final Function<? super I, ? extends ListenableFuture<? extends O>>
+ function,
+ Executor executor) {
+ checkNotNull(function);
+ ChainingListenableFuture<I, O> chain =
+ new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() {
+ @Override
+ /*
+ * All methods of ListenableFuture are covariant, and we don't expose
+ * the object anywhere that would allow it to be downcast.
+ */
+ @SuppressWarnings("unchecked")
+ public ListenableFuture<O> apply(I input) {
+ return (ListenableFuture) function.apply(input);
}
- }
- }, executor);
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (super.cancel(mayInterruptIfRunning)) {
- running.cancel(mayInterruptIfRunning);
- return true;
- }
- return false;
- }
+ }, input);
+ input.addListener(chain, executor);
+ return chain;
}
/**
@@ -523,28 +302,20 @@ public final class Futures {
* transform(rowKeyFuture, queryFunction);
* }</pre>
*
- * Note: If the derived {@code Future} is slow or heavyweight to create
- * (whether the {@code Future} itself is slow or heavyweight to complete is
- * irrelevant), consider {@linkplain #transform(ListenableFuture,
- * AsyncFunction, Executor) supplying an executor}. If you do not supply an
- * executor, {@code transform} will use {@link
- * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
- * caveats for heavier operations. For example, the call to {@code
- * function.apply} may run on an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code transform} is
- * called, {@code transform} will call {@code function.apply} inline.
- * <li>If the input {@code Future} is not yet done, {@code transform} will
- * schedule {@code function.apply} to be run by the thread that completes the
- * input {@code Future}, which may be an internal system thread such as an
- * RPC network thread.
- * </ul>
- *
- * Also note that, regardless of which thread executes {@code
- * function.apply}, all other registered but unexecuted listeners are
- * prevented from running during its execution, even if those listeners are
- * to run in other executors.
+ * <p>Note: This overload of {@code transform} is designed for cases in which
+ * the work of creating the derived {@code Future} is fast and lightweight,
+ * as the method does not accept an {@code Executor} in which to perform the
+ * the work. (The created {@code Future} itself need not complete quickly.)
+ * For heavier operations, this overload carries some caveats: First, the
+ * thread that {@code function.apply} runs in depends on whether the input
+ * {@code Future} is done at the time {@code transform} is called. In
+ * particular, if called late, {@code transform} will run the operation in
+ * the thread that called {@code transform}. Second, {@code function.apply}
+ * may run in an internal thread of the system responsible for the input
+ * {@code Future}, such as an RPC network thread. Finally, during the
+ * execution of a {@code sameThreadExecutor} {@code function.apply}, all
+ * other registered but unexecuted listeners are prevented from running, even
+ * if those listeners are to run in other executors.
*
* <p>The returned {@code Future} attempts to keep its cancellation state in
* sync with that of the input future and that of the future returned by the
@@ -591,11 +362,20 @@ public final class Futures {
* cancelled, the returned {@code Future} will receive a callback in which it
* will attempt to cancel itself.
*
- * <p>When the execution of {@code function.apply} is fast and lightweight
- * (though the {@code Future} it returns need not meet these criteria),
- * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
- * the executor} or explicitly specifying {@code sameThreadExecutor}.
- * However, be aware of the caveats documented in the link above.
+ * <p>Note: For cases in which the work of creating the derived future is
+ * fast and lightweight, consider {@linkplain
+ * Futures#transform(ListenableFuture, Function) the other overload} or
+ * explicit use of {@code sameThreadExecutor}. For heavier derivations, this
+ * choice carries some caveats: First, the thread that {@code function.apply}
+ * runs in depends on whether the input {@code Future} is done at the time
+ * {@code transform} is called. In particular, if called late, {@code
+ * transform} will run the operation in the thread that called {@code
+ * transform}. Second, {@code function.apply} may run in an internal thread
+ * of the system responsible for the input {@code Future}, such as an RPC
+ * network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} {@code function.apply}, all other registered but
+ * unexecuted listeners are prevented from running, even if those listeners
+ * are to run in other executors.
*
* @param input The future to transform
* @param function A function to transform the result of the input future
@@ -631,26 +411,19 @@ public final class Futures {
* transform(queryFuture, rowsFunction);
* }</pre>
*
- * Note: If the transformation is slow or heavyweight, consider {@linkplain
- * #transform(ListenableFuture, Function, Executor) supplying an executor}.
- * If you do not supply an executor, {@code transform} will use {@link
- * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
- * caveats for heavier operations. For example, the call to {@code
- * function.apply} may run on an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code transform} is
- * called, {@code transform} will call {@code function.apply} inline.
- * <li>If the input {@code Future} is not yet done, {@code transform} will
- * schedule {@code function.apply} to be run by the thread that completes the
- * input {@code Future}, which may be an internal system thread such as an
- * RPC network thread.
- * </ul>
- *
- * Also note that, regardless of which thread executes {@code
- * function.apply}, all other registered but unexecuted listeners are
- * prevented from running during its execution, even if those listeners are
- * to run in other executors.
+ * <p>Note: This overload of {@code transform} is designed for cases in which
+ * the transformation is fast and lightweight, as the method does not accept
+ * an {@code Executor} in which to perform the the work. For heavier
+ * transformations, this overload carries some caveats: First, the thread
+ * that the transformation runs in depends on whether the input {@code
+ * Future} is done at the time {@code transform} is called. In particular, if
+ * called late, {@code transform} will perform the transformation in the
+ * thread that called {@code transform}. Second, transformations may run in
+ * an internal thread of the system responsible for the input {@code Future},
+ * such as an RPC network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} transformation, all other registered but unexecuted
+ * listeners are prevented from running, even if those listeners are to run
+ * in other executors.
*
* <p>The returned {@code Future} attempts to keep its cancellation state in
* sync with that of the input future. That is, if the returned {@code Future}
@@ -661,16 +434,16 @@ public final class Futures {
* <p>An example use of this method is to convert a serializable object
* returned from an RPC into a POJO.
*
- * @param input The future to transform
+ * @param future The future to transform
* @param function A Function to transform the results of the provided future
* to the results of the returned future. This will be run in the thread
* that notifies input it is complete.
* @return A future that holds result of the transformation.
* @since 9.0 (in 1.0 as {@code compose})
*/
- public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
+ public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
final Function<? super I, ? extends O> function) {
- return transform(input, function, MoreExecutors.sameThreadExecutor());
+ return transform(future, function, MoreExecutors.sameThreadExecutor());
}
/**
@@ -699,29 +472,39 @@ public final class Futures {
* <p>An example use of this method is to convert a serializable object
* returned from an RPC into a POJO.
*
- * <p>When the transformation is fast and lightweight, consider {@linkplain
- * #transform(ListenableFuture, Function) omitting the executor} or
- * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
- * caveats documented in the link above.
- *
- * @param input The future to transform
+ * <p>Note: For cases in which the transformation is fast and lightweight,
+ * consider {@linkplain Futures#transform(ListenableFuture, Function) the
+ * other overload} or explicit use of {@link
+ * MoreExecutors#sameThreadExecutor}. For heavier transformations, this
+ * choice carries some caveats: First, the thread that the transformation
+ * runs in depends on whether the input {@code Future} is done at the time
+ * {@code transform} is called. In particular, if called late, {@code
+ * transform} will perform the transformation in the thread that called
+ * {@code transform}. Second, transformations may run in an internal thread
+ * of the system responsible for the input {@code Future}, such as an RPC
+ * network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} transformation, all other registered but unexecuted
+ * listeners are prevented from running, even if those listeners are to run
+ * in other executors.
+ *
+ * @param future The future to transform
* @param function A Function to transform the results of the provided future
* to the results of the returned future.
* @param executor Executor to run the function in.
* @return A future that holds result of the transformation.
* @since 9.0 (in 2.0 as {@code compose})
*/
- public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
+ public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
final Function<? super I, ? extends O> function, Executor executor) {
checkNotNull(function);
- AsyncFunction<I, O> wrapperFunction
- = new AsyncFunction<I, O>() {
+ Function<I, ListenableFuture<O>> wrapperFunction
+ = new Function<I, ListenableFuture<O>>() {
@Override public ListenableFuture<O> apply(I input) {
O output = function.apply(input);
return immediateFuture(output);
}
};
- return transform(input, wrapperFunction, executor);
+ return chain(future, wrapperFunction, executor);
}
/**
@@ -741,43 +524,43 @@ public final class Futures {
* who don't have a {@code ListenableFuture} available and
* do not mind repeated, lazy function evaluation.
*
- * @param input The future to transform
+ * @param future The future to transform
* @param function A Function to transform the results of the provided future
* to the results of the returned future.
* @return A future that returns the result of the transformation.
* @since 10.0
*/
@Beta
- public static <I, O> Future<O> lazyTransform(final Future<I> input,
+ public static <I, O> Future<O> lazyTransform(final Future<I> future,
final Function<? super I, ? extends O> function) {
- checkNotNull(input);
+ checkNotNull(future);
checkNotNull(function);
return new Future<O>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- return input.cancel(mayInterruptIfRunning);
+ return future.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
- return input.isCancelled();
+ return future.isCancelled();
}
@Override
public boolean isDone() {
- return input.isDone();
+ return future.isDone();
}
@Override
public O get() throws InterruptedException, ExecutionException {
- return applyTransformation(input.get());
+ return applyTransformation(future.get());
}
@Override
public O get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- return applyTransformation(input.get(timeout, unit));
+ return applyTransformation(future.get(timeout, unit));
}
private O applyTransformation(I input) throws ExecutionException {
@@ -806,6 +589,8 @@ public final class Futures {
private AsyncFunction<? super I, ? extends O> function;
private ListenableFuture<? extends I> inputFuture;
private volatile ListenableFuture<? extends O> outputFuture;
+ private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
+ new LinkedBlockingQueue<Boolean>(1);
private final CountDownLatch outputCreated = new CountDownLatch(1);
private ChainingListenableFuture(
@@ -815,6 +600,90 @@ public final class Futures {
this.inputFuture = checkNotNull(inputFuture);
}
+ /**
+ * Delegate the get() to the input and output futures, in case
+ * their implementations defer starting computation until their
+ * own get() is invoked.
+ */
+ @Override
+ public O get() throws InterruptedException, ExecutionException {
+ if (!isDone()) {
+ // Invoking get on the inputFuture will ensure our own run()
+ // method below is invoked as a listener when inputFuture sets
+ // its value. Therefore when get() returns we should then see
+ // the outputFuture be created.
+ ListenableFuture<? extends I> inputFuture = this.inputFuture;
+ if (inputFuture != null) {
+ inputFuture.get();
+ }
+
+ // If our listener was scheduled to run on an executor we may
+ // need to wait for our listener to finish running before the
+ // outputFuture has been constructed by the function.
+ outputCreated.await();
+
+ // Like above with the inputFuture, we have a listener on
+ // the outputFuture that will set our own value when its
+ // value is set. Invoking get will ensure the output can
+ // complete and invoke our listener, so that we can later
+ // get the result.
+ ListenableFuture<? extends O> outputFuture = this.outputFuture;
+ if (outputFuture != null) {
+ outputFuture.get();
+ }
+ }
+ return super.get();
+ }
+
+ /**
+ * Delegate the get() to the input and output futures, in case
+ * their implementations defer starting computation until their
+ * own get() is invoked.
+ */
+ @Override
+ public O get(long timeout, TimeUnit unit) throws TimeoutException,
+ ExecutionException, InterruptedException {
+ if (!isDone()) {
+ // Use a single time unit so we can decrease remaining timeout
+ // as we wait for various phases to complete.
+ if (unit != NANOSECONDS) {
+ timeout = NANOSECONDS.convert(timeout, unit);
+ unit = NANOSECONDS;
+ }
+
+ // Invoking get on the inputFuture will ensure our own run()
+ // method below is invoked as a listener when inputFuture sets
+ // its value. Therefore when get() returns we should then see
+ // the outputFuture be created.
+ ListenableFuture<? extends I> inputFuture = this.inputFuture;
+ if (inputFuture != null) {
+ long start = System.nanoTime();
+ inputFuture.get(timeout, unit);
+ timeout -= Math.max(0, System.nanoTime() - start);
+ }
+
+ // If our listener was scheduled to run on an executor we may
+ // need to wait for our listener to finish running before the
+ // outputFuture has been constructed by the function.
+ long start = System.nanoTime();
+ if (!outputCreated.await(timeout, unit)) {
+ throw new TimeoutException();
+ }
+ timeout -= Math.max(0, System.nanoTime() - start);
+
+ // Like above with the inputFuture, we have a listener on
+ // the outputFuture that will set our own value when its
+ // value is set. Invoking get will ensure the output can
+ // complete and invoke our listener, so that we can later
+ // get the result.
+ ListenableFuture<? extends O> outputFuture = this.outputFuture;
+ if (outputFuture != null) {
+ outputFuture.get(timeout, unit);
+ }
+ }
+ return super.get(timeout, unit);
+ }
+
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
/*
@@ -824,6 +693,7 @@ public final class Futures {
if (super.cancel(mayInterruptIfRunning)) {
// This should never block since only one thread is allowed to cancel
// this Future.
+ putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
cancel(inputFuture, mayInterruptIfRunning);
cancel(outputFuture, mayInterruptIfRunning);
return true;
@@ -859,7 +729,13 @@ public final class Futures {
final ListenableFuture<? extends O> outputFuture = this.outputFuture =
function.apply(sourceResult);
if (isCancelled()) {
- outputFuture.cancel(wasInterrupted());
+ // Handles the case where cancel was called while the function was
+ // being applied.
+ // There is a gap in cancel(boolean) between calling sync.cancel()
+ // and storing the value of mayInterruptIfRunning, so this thread
+ // needs to block, waiting for that value.
+ outputFuture.cancel(
+ takeUninterruptibly(mayInterruptIfRunningChannel));
this.outputFuture = null;
return;
}
@@ -907,52 +783,14 @@ public final class Futures {
}
/**
- * Returns a new {@code ListenableFuture} whose result is the product of
- * calling {@code get()} on the {@code Future} nested within the given {@code
- * Future}, effectively chaining the futures one after the other. Example:
- *
- * <pre> {@code
- * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
- * ListenableFuture<String> dereferenced = dereference(nested);
- * }</pre>
- *
- * <p>This call has the same cancellation and execution semantics as {@link
- * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
- * Future} attempts to keep its cancellation state in sync with both the
- * input {@code Future} and the nested {@code Future}. The transformation
- * is very lightweight and therefore takes place in the thread that called
- * {@code dereference}.
- *
- * @param nested The nested future to transform.
- * @return A future that holds result of the inner future.
- * @since 13.0
- */
- @Beta
- @SuppressWarnings({"rawtypes", "unchecked"})
- public static <V> ListenableFuture<V> dereference(
- ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
- return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
- }
-
- /**
- * Helper {@code Function} for {@link #dereference}.
- */
- private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
- new AsyncFunction<ListenableFuture<Object>, Object>() {
- @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
- return input;
- }
- };
-
- /**
* Creates a new {@code ListenableFuture} whose value is a list containing the
* values of all its input futures, if all succeed. If any input fails, the
* returned future fails.
*
* <p>The list of results is in the same order as the input list.
*
- * <p>Canceling this future will attempt to cancel all the component futures,
- * and if any of the provided futures fails or is canceled, this one is,
+ * <p>Canceling this future does not cancel any of the component futures;
+ * however, if any of the provided futures fails or is canceled, this one is,
* too.
*
* @param futures futures to combine
@@ -963,7 +801,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> allAsList(
ListenableFuture<? extends V>... futures) {
- return listFuture(ImmutableList.copyOf(futures), true,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), true,
MoreExecutors.sameThreadExecutor());
}
@@ -974,8 +812,8 @@ public final class Futures {
*
* <p>The list of results is in the same order as the input list.
*
- * <p>Canceling this future will attempt to cancel all the component futures,
- * and if any of the provided futures fails or is canceled, this one is,
+ * <p>Canceling this future does not cancel any of the component futures;
+ * however, if any of the provided futures fails or is canceled, this one is,
* too.
*
* @param futures futures to combine
@@ -986,7 +824,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> allAsList(
Iterable<? extends ListenableFuture<? extends V>> futures) {
- return listFuture(ImmutableList.copyOf(futures), true,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), true,
MoreExecutors.sameThreadExecutor());
}
@@ -998,8 +836,6 @@ public final class Futures {
* indistinguishable from the future having a successful value of
* {@code null}).
*
- * <p>Canceling this future will attempt to cancel all the component futures.
- *
* @param futures futures to combine
* @return a future that provides a list of the results of the component
* futures
@@ -1008,7 +844,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> successfulAsList(
ListenableFuture<? extends V>... futures) {
- return listFuture(ImmutableList.copyOf(futures), false,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), false,
MoreExecutors.sameThreadExecutor());
}
@@ -1020,8 +856,6 @@ public final class Futures {
* indistinguishable from the future having a successful value of
* {@code null}).
*
- * <p>Canceling this future will attempt to cancel all the component futures.
- *
* @param futures futures to combine
* @return a future that provides a list of the results of the component
* futures
@@ -1030,7 +864,7 @@ public final class Futures {
@Beta
public static <V> ListenableFuture<List<V>> successfulAsList(
Iterable<? extends ListenableFuture<? extends V>> futures) {
- return listFuture(ImmutableList.copyOf(futures), false,
+ return new ListFuture<V>(ImmutableList.copyOf(futures), false,
MoreExecutors.sameThreadExecutor());
}
@@ -1055,26 +889,19 @@ public final class Futures {
* }
* });}</pre>
*
- * Note: If the callback is slow or heavyweight, consider {@linkplain
- * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
- * executor}. If you do not supply an executor, {@code addCallback} will use
- * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
- * some caveats for heavier operations. For example, the callback may run on
- * an unpredictable or undesirable thread:
- *
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code addCallback} is
- * called, {@code addCallback} will execute the callback inline.
- * <li>If the input {@code Future} is not yet done, {@code addCallback} will
- * schedule the callback to be run by the thread that completes the input
- * {@code Future}, which may be an internal system thread such as an RPC
- * network thread.
- * </ul>
- *
- * Also note that, regardless of which thread executes the callback, all
- * other registered but unexecuted listeners are prevented from running
- * during its execution, even if those listeners are to run in other
- * executors.
+ * <p>Note: This overload of {@code addCallback} is designed for cases in
+ * which the callack is fast and lightweight, as the method does not accept
+ * an {@code Executor} in which to perform the the work. For heavier
+ * callbacks, this overload carries some caveats: First, the thread that the
+ * callback runs in depends on whether the input {@code Future} is done at the
+ * time {@code addCallback} is called and on whether the input {@code Future}
+ * is ever cancelled. In particular, {@code addCallback} may execute the
+ * callback in the thread that calls {@code addCallback} or {@code
+ * Future.cancel}. Second, callbacks may run in an internal thread of the
+ * system responsible for the input {@code Future}, such as an RPC network
+ * thread. Finally, during the execution of a {@code sameThreadExecutor}
+ * callback, all other registered but unexecuted listeners are prevented from
+ * running, even if those listeners are to run in other executors.
*
* <p>For a more general interface to attach a completion listener to a
* {@code Future}, see {@link ListenableFuture#addListener addListener}.
@@ -1111,10 +938,20 @@ public final class Futures {
* }
* });}</pre>
*
- * When the callback is fast and lightweight, consider {@linkplain
- * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
- * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
- * caveats documented in the link above.
+ * When the callback is fast and lightweight consider {@linkplain
+ * Futures#addCallback(ListenableFuture, FutureCallback) the other overload}
+ * or explicit use of {@link MoreExecutors#sameThreadExecutor
+ * sameThreadExecutor}. For heavier callbacks, this choice carries some
+ * caveats: First, the thread that the callback runs in depends on whether
+ * the input {@code Future} is done at the time {@code addCallback} is called
+ * and on whether the input {@code Future} is ever cancelled. In particular,
+ * {@code addCallback} may execute the callback in the thread that calls
+ * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in
+ * an internal thread of the system responsible for the input {@code Future},
+ * such as an RPC network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor} callback, all other registered but unexecuted
+ * listeners are prevented from running, even if those listeners are to run
+ * in other executors.
*
* <p>For a more general interface to attach a completion listener to a
* {@code Future}, see {@link ListenableFuture#addListener addListener}.
@@ -1131,22 +968,18 @@ public final class Futures {
Runnable callbackListener = new Runnable() {
@Override
public void run() {
- final V value;
try {
// TODO(user): (Before Guava release), validate that this
// is the thing for IE.
- value = getUninterruptibly(future);
+ V value = getUninterruptibly(future);
+ callback.onSuccess(value);
} catch (ExecutionException e) {
callback.onFailure(e.getCause());
- return;
} catch (RuntimeException e) {
callback.onFailure(e);
- return;
} catch (Error e) {
callback.onFailure(e);
- return;
}
- callback.onSuccess(value);
}
};
future.addListener(callbackListener, executor);
@@ -1434,53 +1267,49 @@ public final class Futures {
}
}
- private interface FutureCombiner<V, C> {
- C combine(List<Optional<V>> values);
- }
-
- private static class CombinedFuture<V, C> extends AbstractFuture<C> {
- ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
+ /**
+ * Class that implements {@link #allAsList} and {@link #successfulAsList}.
+ * The idea is to create a (null-filled) List and register a listener with
+ * each component future to fill out the value in the List when that future
+ * completes.
+ */
+ private static class ListFuture<V> extends AbstractFuture<List<V>> {
+ ImmutableList<? extends ListenableFuture<? extends V>> futures;
final boolean allMustSucceed;
final AtomicInteger remaining;
- FutureCombiner<V, C> combiner;
- List<Optional<V>> values;
+ List<V> values;
- CombinedFuture(
- ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
- boolean allMustSucceed, Executor listenerExecutor,
- FutureCombiner<V, C> combiner) {
+ /**
+ * Constructor.
+ *
+ * @param futures all the futures to build the list from
+ * @param allMustSucceed whether a single failure or cancellation should
+ * propagate to this future
+ * @param listenerExecutor used to run listeners on all the passed in
+ * futures.
+ */
+ ListFuture(
+ final ImmutableList<? extends ListenableFuture<? extends V>> futures,
+ final boolean allMustSucceed, final Executor listenerExecutor) {
this.futures = futures;
+ this.values = Lists.newArrayListWithCapacity(futures.size());
this.allMustSucceed = allMustSucceed;
this.remaining = new AtomicInteger(futures.size());
- this.combiner = combiner;
- this.values = Lists.newArrayListWithCapacity(futures.size());
+
init(listenerExecutor);
}
- /**
- * Must be called at the end of the constructor.
- */
- protected void init(final Executor listenerExecutor) {
+ private void init(final Executor listenerExecutor) {
// First, schedule cleanup to execute when the Future is done.
addListener(new Runnable() {
@Override
public void run() {
- // Cancel all the component futures.
- if (CombinedFuture.this.isCancelled()) {
- for (ListenableFuture<?> future : CombinedFuture.this.futures) {
- future.cancel(CombinedFuture.this.wasInterrupted());
- }
- }
-
// By now the values array has either been set as the Future's value,
// or (in case of failure) is no longer useful.
- CombinedFuture.this.futures = null;
+ ListFuture.this.values = null;
// Let go of the memory held by other futures
- CombinedFuture.this.values = null;
-
- // The combiner may also hold state, so free that as well
- CombinedFuture.this.combiner = null;
+ ListFuture.this.futures = null;
}
}, MoreExecutors.sameThreadExecutor());
@@ -1488,7 +1317,7 @@ public final class Futures {
// Corner case: List is empty.
if (futures.isEmpty()) {
- set(combiner.combine(ImmutableList.<Optional<V>>of()));
+ set(Lists.newArrayList(values));
return;
}
@@ -1503,11 +1332,11 @@ public final class Futures {
// this loop, the last call to addListener() will callback to
// setOneValue(), transitively call our cleanup listener, and set
// this.futures to null.
- // This is not actually a problem, since the foreach only needs
- // this.futures to be non-null at the beginning of the loop.
- int i = 0;
- for (final ListenableFuture<? extends V> listenable : futures) {
- final int index = i++;
+ // We store a reference to futures to avoid the NPE.
+ ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
+ for (int i = 0; i < localFutures.size(); i++) {
+ final ListenableFuture<? extends V> listenable = localFutures.get(i);
+ final int index = i;
listenable.addListener(new Runnable() {
@Override
public void run() {
@@ -1521,12 +1350,12 @@ public final class Futures {
* Sets the value at the given index to that of the given future.
*/
private void setOneValue(int index, Future<? extends V> future) {
- List<Optional<V>> localValues = values;
+ List<V> localValues = values;
if (isDone() || localValues == null) {
// Some other future failed or has been cancelled, causing this one to
// also be cancelled or have an exception set. This should only happen
- // if allMustSucceed is true or if the output itself has been cancelled.
- checkState(allMustSucceed || isCancelled(),
+ // if allMustSucceed is true.
+ checkState(allMustSucceed,
"Future was done before all dependencies completed");
return;
}
@@ -1534,8 +1363,7 @@ public final class Futures {
try {
checkState(future.isDone(),
"Tried to set value from future which is not done");
- V returnValue = getUninterruptibly(future);
- localValues.set(index, Optional.fromNullable(returnValue));
+ localValues.set(index, getUninterruptibly(future));
} catch (CancellationException e) {
if (allMustSucceed) {
// Set ourselves as cancelled. Let the input futures keep running
@@ -1561,9 +1389,9 @@ public final class Futures {
int newRemaining = remaining.decrementAndGet();
checkState(newRemaining >= 0, "Less than 0 remaining futures");
if (newRemaining == 0) {
- FutureCombiner<V, C> localCombiner = combiner;
- if (localCombiner != null) {
- set(localCombiner.combine(localValues));
+ localValues = values;
+ if (localValues != null) {
+ set(Lists.newArrayList(localValues));
} else {
checkState(isDone());
}
@@ -1571,25 +1399,46 @@ public final class Futures {
}
}
- }
+ @Override
+ public List<V> get() throws InterruptedException, ExecutionException {
+ callAllGets();
- /** Used for {@link #allAsList} and {@link #successfulAsList}. */
- private static <V> ListenableFuture<List<V>> listFuture(
- ImmutableList<ListenableFuture<? extends V>> futures,
- boolean allMustSucceed, Executor listenerExecutor) {
- return new CombinedFuture<V, List<V>>(
- futures, allMustSucceed, listenerExecutor,
- new FutureCombiner<V, List<V>>() {
- @Override
- public List<V> combine(List<Optional<V>> values) {
- List<V> result = Lists.newArrayList();
- for (Optional<V> element : values) {
- result.add(element != null ? element.orNull() : null);
+ // This may still block in spite of the calls above, as the listeners may
+ // be scheduled for execution in other threads.
+ return super.get();
+ }
+
+ /**
+ * Calls the get method of all dependency futures to work around a bug in
+ * some ListenableFutures where the listeners aren't called until get() is
+ * called.
+ */
+ private void callAllGets() throws InterruptedException {
+ List<? extends ListenableFuture<? extends V>> oldFutures = futures;
+ if (oldFutures != null && !isDone()) {
+ for (ListenableFuture<? extends V> future : oldFutures) {
+ // We wait for a little while for the future, but if it's not done,
+ // we check that no other futures caused a cancellation or failure.
+ // This can introduce a delay of up to 10ms in reporting an exception.
+ while (!future.isDone()) {
+ try {
+ future.get();
+ } catch (Error e) {
+ throw e;
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Throwable e) {
+ // ExecutionException / CancellationException / RuntimeException
+ if (allMustSucceed) {
+ return;
+ } else {
+ continue;
+ }
}
- // TODO(user): This should ultimately return an unmodifiableList
- return result;
}
- });
+ }
+ }
+ }
}
/**
diff --git a/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java b/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java
index 645a648..6d74bda 100644
--- a/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java
+++ b/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java
@@ -19,6 +19,7 @@ package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -41,7 +42,7 @@ public final class JdkFutureAdapters {
* Assigns a thread to the given {@link Future} to provide {@link
* ListenableFuture} functionality.
*
- * <p><b>Warning:</b> If the input future does not already implement {@code
+ * <p><b>Warning:</b> If the input future does not already implement {@link
* ListenableFuture}, the returned future will emulate {@link
* ListenableFuture#addListener} by taking a thread from an internal,
* unbounded pool at the first call to {@code addListener} and holding it
@@ -56,40 +57,17 @@ public final class JdkFutureAdapters {
*/
public static <V> ListenableFuture<V> listenInPoolThread(
Future<V> future) {
- if (future instanceof ListenableFuture) {
+ if (future instanceof ListenableFuture<?>) {
return (ListenableFuture<V>) future;
}
return new ListenableFutureAdapter<V>(future);
}
- /**
- * Submits a blocking task for the given {@link Future} to provide {@link
- * ListenableFuture} functionality.
- *
- * <p><b>Warning:</b> If the input future does not already implement {@code
- * ListenableFuture}, the returned future will emulate {@link
- * ListenableFuture#addListener} by submitting a task to the given executor at
- * at the first call to {@code addListener}. The task must be started by the
- * executor promptly, or else the returned {@code ListenableFuture} may fail
- * to work. The task's execution consists of blocking until the input future
- * is {@linkplain Future#isDone() done}, so each call to this method may
- * claim and hold a thread for an arbitrary length of time. Use of bounded
- * executors or other executors that may fail to execute a task promptly may
- * result in deadlocks.
- *
- * <p>Prefer to create {@code ListenableFuture} instances with {@link
- * SettableFuture}, {@link MoreExecutors#listeningDecorator(
- * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
- * {@link AbstractFuture}, and other utilities over creating plain {@code
- * Future} instances to be upgraded to {@code ListenableFuture} after the
- * fact.
- *
- * @since 12.0
- */
- public static <V> ListenableFuture<V> listenInPoolThread(
+ @VisibleForTesting
+ static <V> ListenableFuture<V> listenInPoolThread(
Future<V> future, Executor executor) {
checkNotNull(executor);
- if (future instanceof ListenableFuture) {
+ if (future instanceof ListenableFuture<?>) {
return (ListenableFuture<V>) future;
}
return new ListenableFutureAdapter<V>(future, executor);
diff --git a/guava/src/com/google/common/util/concurrent/ListenableFuture.java b/guava/src/com/google/common/util/concurrent/ListenableFuture.java
index eb05354..a0ab2db 100644
--- a/guava/src/com/google/common/util/concurrent/ListenableFuture.java
+++ b/guava/src/com/google/common/util/concurrent/ListenableFuture.java
@@ -28,10 +28,6 @@ import java.util.concurrent.RejectedExecutionException;
* computation is {@linkplain Future#isDone() complete}. If the computation has
* already completed when the listener is added, the listener will execute
* immediately.
- *
- * <p>See the Guava User Guide article on <a href=
- * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
- * {@code ListenableFuture}</a>.
*
* <h3>Purpose</h3>
*
@@ -102,28 +98,20 @@ public interface ListenableFuture<V> extends Future<V> {
*
* <p>Note: For fast, lightweight listeners that would be safe to execute in
* any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier
- * listeners, {@code sameThreadExecutor()} carries some caveats. For
- * example, the listener may run on an unpredictable or undesirable thread:
+ * listeners, {@code sameThreadExecutor()} carries some caveats: First, the
+ * thread that the listener runs in depends on whether the {@code Future} is
+ * done at the time it is added and on whether it is ever canclled. In
+ * particular, listeners may run in the thread that calls {@code addListener}
+ * or the thread that calls {@code cancel}. Second, listeners may run in an
+ * internal thread of the system responsible for the input {@code Future},
+ * such as an RPC network thread. Finally, during the execution of a {@code
+ * sameThreadExecutor()} listener, all other registered but unexecuted
+ * listeners are prevented from running, even if those listeners are to run
+ * in other executors.
*
- * <ul>
- * <li>If the input {@code Future} is done at the time {@code addListener} is
- * called, {@code addListener} will execute the listener inline.
- * <li>If the input {@code Future} is not yet done, {@code addListener} will
- * schedule the listener to be run by the thread that completes the input
- * {@code Future}, which may be an internal system thread such as an RPC
- * network thread.
- * </ul>
- *
- * Also note that, regardless of which thread executes the listener, all
- * other registered but unexecuted listeners are prevented from running
- * during its execution, even if those listeners are to run in other
- * executors.
- *
- * <p>This is the most general listener interface. For common operations
- * performed using listeners, see {@link
- * com.google.common.util.concurrent.Futures}. For a simplified but general
- * listener interface, see {@link
- * com.google.common.util.concurrent.Futures#addCallback addCallback()}.
+ * <p>This is the most general listener interface.
+ * For common operations performed using listeners,
+ * see {@link com.google.common.util.concurrent.Futures}
*
* @param listener the listener to run when the computation is complete
* @param executor the executor to run the listener in
diff --git a/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java b/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java
index 35d6f13..474635c 100644
--- a/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java
+++ b/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java
@@ -27,17 +27,12 @@ import javax.annotation.Nullable;
* interface. Unlike {@code FutureTask}, {@code ListenableFutureTask} does not
* provide an overrideable {@link FutureTask#done() done()} method. For similar
* functionality, call {@link #addListener}.
- *
- * <p>
*
* @author Sven Mawson
* @since 1.0
*/
-public class ListenableFutureTask<V> extends FutureTask<V>
+public final class ListenableFutureTask<V> extends FutureTask<V>
implements ListenableFuture<V> {
- // TODO(cpovirk): explore ways of making ListenableFutureTask final. There are
- // some valid reasons such as BoundedQueueExecutorService to allow extends but it
- // would be nice to make it final to avoid unintended usage.
// The execution list to hold our listeners.
private final ExecutionList executionList = new ExecutionList();
@@ -70,11 +65,11 @@ public class ListenableFutureTask<V> extends FutureTask<V>
return new ListenableFutureTask<V>(runnable, result);
}
- ListenableFutureTask(Callable<V> callable) {
+ private ListenableFutureTask(Callable<V> callable) {
super(callable);
}
- ListenableFutureTask(Runnable runnable, @Nullable V result) {
+ private ListenableFutureTask(Runnable runnable, @Nullable V result) {
super(runnable, result);
}
diff --git a/guava/src/com/google/common/util/concurrent/MoreExecutors.java b/guava/src/com/google/common/util/concurrent/MoreExecutors.java
index bd94db7..915b96d 100644
--- a/guava/src/com/google/common/util/concurrent/MoreExecutors.java
+++ b/guava/src/com/google/common/util/concurrent/MoreExecutors.java
@@ -16,26 +16,15 @@
package com.google.common.util.concurrent;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -44,7 +33,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -79,8 +67,16 @@ public final class MoreExecutors {
@Beta
public static ExecutorService getExitingExecutorService(
ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- return new Application()
- .getExitingExecutorService(executor, terminationTimeout, timeUnit);
+ executor.setThreadFactory(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setThreadFactory(executor.getThreadFactory())
+ .build());
+
+ ExecutorService service = Executors.unconfigurableExecutorService(executor);
+
+ addDelayedShutdownHook(service, terminationTimeout, timeUnit);
+
+ return service;
}
/**
@@ -101,9 +97,19 @@ public final class MoreExecutors {
*/
@Beta
public static ScheduledExecutorService getExitingScheduledExecutorService(
- ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- return new Application()
- .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
+ ScheduledThreadPoolExecutor executor, long terminationTimeout,
+ TimeUnit timeUnit) {
+ executor.setThreadFactory(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setThreadFactory(executor.getThreadFactory())
+ .build());
+
+ ScheduledExecutorService service =
+ Executors.unconfigurableScheduledExecutorService(executor);
+
+ addDelayedShutdownHook(service, terminationTimeout, timeUnit);
+
+ return service;
}
/**
@@ -119,9 +125,24 @@ public final class MoreExecutors {
*/
@Beta
public static void addDelayedShutdownHook(
- ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
- new Application()
- .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
+ final ExecutorService service, final long terminationTimeout,
+ final TimeUnit timeUnit) {
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // We'd like to log progress and failures that may arise in the
+ // following code, but unfortunately the behavior of logging
+ // is undefined in shutdown hooks.
+ // This is because the logging code installs a shutdown hook of its
+ // own. See Cleaner class inside {@link LogManager}.
+ service.shutdown();
+ service.awaitTermination(terminationTimeout, timeUnit);
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
+ }
+ }));
}
/**
@@ -140,8 +161,9 @@ public final class MoreExecutors {
* @return an unmodifiable version of the input which will not hang the JVM
*/
@Beta
- public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
- return new Application().getExitingExecutorService(executor);
+ public static ExecutorService getExitingExecutorService(
+ ThreadPoolExecutor executor) {
+ return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
}
/**
@@ -162,69 +184,7 @@ public final class MoreExecutors {
@Beta
public static ScheduledExecutorService getExitingScheduledExecutorService(
ScheduledThreadPoolExecutor executor) {
- return new Application().getExitingScheduledExecutorService(executor);
- }
-
- /** Represents the current application to register shutdown hooks. */
- @VisibleForTesting static class Application {
-
- final ExecutorService getExitingExecutorService(
- ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- useDaemonThreadFactory(executor);
- ExecutorService service = Executors.unconfigurableExecutorService(executor);
- addDelayedShutdownHook(service, terminationTimeout, timeUnit);
- return service;
- }
-
- final ScheduledExecutorService getExitingScheduledExecutorService(
- ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- useDaemonThreadFactory(executor);
- ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
- addDelayedShutdownHook(service, terminationTimeout, timeUnit);
- return service;
- }
-
- final void addDelayedShutdownHook(
- final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
- checkNotNull(service);
- checkNotNull(timeUnit);
- addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
- @Override
- public void run() {
- try {
- // We'd like to log progress and failures that may arise in the
- // following code, but unfortunately the behavior of logging
- // is undefined in shutdown hooks.
- // This is because the logging code installs a shutdown hook of its
- // own. See Cleaner class inside {@link LogManager}.
- service.shutdown();
- service.awaitTermination(terminationTimeout, timeUnit);
- } catch (InterruptedException ignored) {
- // We're shutting down anyway, so just ignore.
- }
- }
- }));
- }
-
- final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
- return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
- }
-
- final ScheduledExecutorService getExitingScheduledExecutorService(
- ScheduledThreadPoolExecutor executor) {
- return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
- }
-
- @VisibleForTesting void addShutdownHook(Thread hook) {
- Runtime.getRuntime().addShutdownHook(hook);
- }
- }
-
- private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
- executor.setThreadFactory(new ThreadFactoryBuilder()
- .setDaemon(true)
- .setThreadFactory(executor.getThreadFactory())
- .build());
+ return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
}
/**
@@ -483,7 +443,6 @@ public final class MoreExecutors {
private static class ScheduledListeningDecorator
extends ListeningDecorator implements ListeningScheduledExecutorService {
- @SuppressWarnings("hiding")
final ScheduledExecutorService delegate;
ScheduledListeningDecorator(ScheduledExecutorService delegate) {
@@ -516,172 +475,4 @@ public final class MoreExecutors {
command, initialDelay, delay, unit);
}
}
-
- /*
- * This following method is a modified version of one found in
- * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
- * which contained the following notice:
- *
- * Written by Doug Lea with assistance from members of JCP JSR-166
- * Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/publicdomain/zero/1.0/
- * Other contributors include Andrew Wright, Jeffrey Hayes,
- * Pat Fisher, Mike Judd.
- */
-
- /**
- * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
- * implementations.
- */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
- Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
- throws InterruptedException, ExecutionException, TimeoutException {
- checkNotNull(executorService);
- int ntasks = tasks.size();
- checkArgument(ntasks > 0);
- List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
- BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
-
- // For efficiency, especially in executors with limited
- // parallelism, check to see if previously submitted tasks are
- // done before submitting more of them. This interleaving
- // plus the exception mechanics account for messiness of main
- // loop.
-
- try {
- // Record exceptions so that if we fail to obtain any
- // result, we can throw the last exception we got.
- ExecutionException ee = null;
- long lastTime = timed ? System.nanoTime() : 0;
- Iterator<? extends Callable<T>> it = tasks.iterator();
-
- futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
- --ntasks;
- int active = 1;
-
- for (;;) {
- Future<T> f = futureQueue.poll();
- if (f == null) {
- if (ntasks > 0) {
- --ntasks;
- futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
- ++active;
- } else if (active == 0) {
- break;
- } else if (timed) {
- f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
- if (f == null) {
- throw new TimeoutException();
- }
- long now = System.nanoTime();
- nanos -= now - lastTime;
- lastTime = now;
- } else {
- f = futureQueue.take();
- }
- }
- if (f != null) {
- --active;
- try {
- return f.get();
- } catch (ExecutionException eex) {
- ee = eex;
- } catch (RuntimeException rex) {
- ee = new ExecutionException(rex);
- }
- }
- }
-
- if (ee == null) {
- ee = new ExecutionException(null);
- }
- throw ee;
- } finally {
- for (Future<T> f : futures) {
- f.cancel(true);
- }
- }
- }
-
- /**
- * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
- */
- private static <T> ListenableFuture<T> submitAndAddQueueListener(
- ListeningExecutorService executorService, Callable<T> task,
- final BlockingQueue<Future<T>> queue) {
- final ListenableFuture<T> future = executorService.submit(task);
- future.addListener(new Runnable() {
- @Override public void run() {
- queue.add(future);
- }
- }, MoreExecutors.sameThreadExecutor());
- return future;
- }
-
- /**
- * Returns a default thread factory used to create new threads.
- *
- * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}.
- * Otherwise, returns {@link Executors#defaultThreadFactory()}.
- *
- * @since 14.0
- */
- @Beta
- public static ThreadFactory platformThreadFactory() {
- if (!isAppEngine()) {
- return Executors.defaultThreadFactory();
- }
- try {
- return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
- .getMethod("currentRequestThreadFactory")
- .invoke(null);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
- } catch (InvocationTargetException e) {
- throw Throwables.propagate(e.getCause());
- }
- }
-
- private static boolean isAppEngine() {
- if (System.getProperty("com.google.appengine.runtime.environment") == null) {
- return false;
- }
- try {
- // If the current environment is null, we're not inside AppEngine.
- return Class.forName("com.google.apphosting.api.ApiProxy")
- .getMethod("getCurrentEnvironment")
- .invoke(null) != null;
- } catch (ClassNotFoundException e) {
- // If ApiProxy doesn't exist, we're not on AppEngine at all.
- return false;
- } catch (InvocationTargetException e) {
- // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
- return false;
- } catch (IllegalAccessException e) {
- // If the method isn't accessible, we're not on a supported version of AppEngine;
- return false;
- } catch (NoSuchMethodException e) {
- // If the method doesn't exist, we're not on a supported version of AppEngine;
- return false;
- }
- }
-
- /**
- * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name}
- * unless changing the name is forbidden by the security manager.
- */
- static Thread newThread(String name, Runnable runnable) {
- checkNotNull(name);
- checkNotNull(runnable);
- Thread result = platformThreadFactory().newThread(runnable);
- try {
- result.setName(name);
- } catch (SecurityException e) {
- // OK if we can't set the name in this environment.
- }
- return result;
- }
}
diff --git a/guava/src/com/google/common/util/concurrent/RateLimiter.java b/guava/src/com/google/common/util/concurrent/RateLimiter.java
deleted file mode 100644
index 4085654..0000000
--- a/guava/src/com/google/common/util/concurrent/RateLimiter.java
+++ /dev/null
@@ -1,690 +0,0 @@
-/*
- * Copyright (C) 2012 The Guava Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.common.util.concurrent;
-
-import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
-
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * A rate limiter. Conceptually, a rate limiter distributes permits at a
- * configurable rate. Each {@link #acquire()} blocks if necessary until a permit is
- * available, and then takes it. Once acquired, permits need not be released.
- *
- * <p>Rate limiters are often used to restrict the rate at which some
- * physical or logical resource is accessed. This is in contrast to {@link
- * java.util.concurrent.Semaphore} which restricts the number of concurrent
- * accesses instead of the rate (note though that concurrency and rate are closely related,
- * e.g. see <a href="http://en.wikipedia.org/wiki/Little's_law">Little's Law</a>).
- *
- * <p>A {@code RateLimiter} is defined primarily by the rate at which permits
- * are issued. Absent additional configuration, permits will be distributed at a
- * fixed rate, defined in terms of permits per second. Permits will be distributed
- * smoothly, with the delay between individual permits being adjusted to ensure
- * that the configured rate is maintained.
- *
- * <p>It is possible to configure a {@code RateLimiter} to have a warmup
- * period during which time the permits issued each second steadily increases until
- * it hits the stable rate.
- *
- * <p>As an example, imagine that we have a list of tasks to execute, but we don't want to
- * submit more than 2 per second:
- *<pre> {@code
- * final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
- * void submitTasks(List<Runnable> tasks, Executor executor) {
- * for (Runnable task : tasks) {
- * rateLimiter.acquire(); // may wait
- * executor.execute(task);
- * }
- * }
- *}</pre>
- *
- * <p>As another example, imagine that we produce a stream of data, and we want to cap it
- * at 5kb per second. This could be accomplished by requiring a permit per byte, and specifying
- * a rate of 5000 permits per second:
- *<pre> {@code
- * final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 permits per second
- * void submitPacket(byte[] packet) {
- * rateLimiter.acquire(packet.length);
- * networkService.send(packet);
- * }
- *}</pre>
- *
- * <p>It is important to note that the number of permits requested <i>never</i>
- * affect the throttling of the request itself (an invocation to {@code acquire(1)}
- * and an invocation to {@code acquire(1000)} will result in exactly the same throttling, if any),
- * but it affects the throttling of the <i>next</i> request. I.e., if an expensive task
- * arrives at an idle RateLimiter, it will be granted immediately, but it is the <i>next</i>
- * request that will experience extra throttling, thus paying for the cost of the expensive
- * task.
- *
- * <p>Note: {@code RateLimiter} does not provide fairness guarantees.
- *
- * @author Dimitris Andreou
- * @since 13.0
- */
-// TODO(user): switch to nano precision. A natural unit of cost is "bytes", and a micro precision
-// would mean a maximum rate of "1MB/s", which might be small in some cases.
-@ThreadSafe
-@Beta
-public abstract class RateLimiter {
- /*
- * How is the RateLimiter designed, and why?
- *
- * The primary feature of a RateLimiter is its "stable rate", the maximum rate that
- * is should allow at normal conditions. This is enforced by "throttling" incoming
- * requests as needed, i.e. compute, for an incoming request, the appropriate throttle time,
- * and make the calling thread wait as much.
- *
- * The simplest way to maintain a rate of QPS is to keep the timestamp of the last
- * granted request, and ensure that (1/QPS) seconds have elapsed since then. For example,
- * for a rate of QPS=5 (5 tokens per second), if we ensure that a request isn't granted
- * earlier than 200ms after the the last one, then we achieve the intended rate.
- * If a request comes and the last request was granted only 100ms ago, then we wait for
- * another 100ms. At this rate, serving 15 fresh permits (i.e. for an acquire(15) request)
- * naturally takes 3 seconds.
- *
- * It is important to realize that such a RateLimiter has a very superficial memory
- * of the past: it only remembers the last request. What if the RateLimiter was unused for
- * a long period of time, then a request arrived and was immediately granted?
- * This RateLimiter would immediately forget about that past underutilization. This may
- * result in either underutilization or overflow, depending on the real world consequences
- * of not using the expected rate.
- *
- * Past underutilization could mean that excess resources are available. Then, the RateLimiter
- * should speed up for a while, to take advantage of these resources. This is important
- * when the rate is applied to networking (limiting bandwidth), where past underutilization
- * typically translates to "almost empty buffers", which can be filled immediately.
- *
- * On the other hand, past underutilization could mean that "the server responsible for
- * handling the request has become less ready for future requests", i.e. its caches become
- * stale, and requests become more likely to trigger expensive operations (a more extreme
- * case of this example is when a server has just booted, and it is mostly busy with getting
- * itself up to speed).
- *
- * To deal with such scenarios, we add an extra dimension, that of "past underutilization",
- * modeled by "storedPermits" variable. This variable is zero when there is no
- * underutilization, and it can grow up to maxStoredPermits, for sufficiently large
- * underutilization. So, the requested permits, by an invocation acquire(permits),
- * are served from:
- * - stored permits (if available)
- * - fresh permits (for any remaining permits)
- *
- * How this works is best explained with an example:
- *
- * For a RateLimiter that produces 1 token per second, every second
- * that goes by with the RateLimiter being unused, we increase storedPermits by 1.
- * Say we leave the RateLimiter unused for 10 seconds (i.e., we expected a request at time
- * X, but we are at time X + 10 seconds before a request actually arrives; this is
- * also related to the point made in the last paragraph), thus storedPermits
- * becomes 10.0 (assuming maxStoredPermits >= 10.0). At that point, a request of acquire(3)
- * arrives. We serve this request out of storedPermits, and reduce that to 7.0 (how this is
- * translated to throttling time is discussed later). Immediately after, assume that an
- * acquire(10) request arriving. We serve the request partly from storedPermits,
- * using all the remaining 7.0 permits, and the remaining 3.0, we serve them by fresh permits
- * produced by the rate limiter.
- *
- * We already know how much time it takes to serve 3 fresh permits: if the rate is
- * "1 token per second", then this will take 3 seconds. But what does it mean to serve 7
- * stored permits? As explained above, there is no unique answer. If we are primarily
- * interested to deal with underutilization, then we want stored permits to be given out
- * /faster/ than fresh ones, because underutilization = free resources for the taking.
- * If we are primarily interested to deal with overflow, then stored permits could
- * be given out /slower/ than fresh ones. Thus, we require a (different in each case)
- * function that translates storedPermits to throtting time.
- *
- * This role is played by storedPermitsToWaitTime(double storedPermits, double permitsToTake).
- * The underlying model is a continuous function mapping storedPermits
- * (from 0.0 to maxStoredPermits) onto the 1/rate (i.e. intervals) that is effective at the given
- * storedPermits. "storedPermits" essentially measure unused time; we spend unused time
- * buying/storing permits. Rate is "permits / time", thus "1 / rate = time / permits".
- * Thus, "1/rate" (time / permits) times "permits" gives time, i.e., integrals on this
- * function (which is what storedPermitsToWaitTime() computes) correspond to minimum intervals
- * between subsequent requests, for the specified number of requested permits.
- *
- * Here is an example of storedPermitsToWaitTime:
- * If storedPermits == 10.0, and we want 3 permits, we take them from storedPermits,
- * reducing them to 7.0, and compute the throttling for these as a call to
- * storedPermitsToWaitTime(storedPermits = 10.0, permitsToTake = 3.0), which will
- * evaluate the integral of the function from 7.0 to 10.0.
- *
- * Using integrals guarantees that the effect of a single acquire(3) is equivalent
- * to { acquire(1); acquire(1); acquire(1); }, or { acquire(2); acquire(1); }, etc,
- * since the integral of the function in [7.0, 10.0] is equivalent to the sum of the
- * integrals of [7.0, 8.0], [8.0, 9.0], [9.0, 10.0] (and so on), no matter
- * what the function is. This guarantees that we handle correctly requests of varying weight
- * (permits), /no matter/ what the actual function is - so we can tweak the latter freely.
- * (The only requirement, obviously, is that we can compute its integrals).
- *
- * Note well that if, for this function, we chose a horizontal line, at height of exactly
- * (1/QPS), then the effect of the function is non-existent: we serve storedPermits at
- * exactly the same cost as fresh ones (1/QPS is the cost for each). We use this trick later.
- *
- * If we pick a function that goes /below/ that horizontal line, it means that we reduce
- * the area of the function, thus time. Thus, the RateLimiter becomes /faster/ after a
- * period of underutilization. If, on the other hand, we pick a function that
- * goes /above/ that horizontal line, then it means that the area (time) is increased,
- * thus storedPermits are more costly than fresh permits, thus the RateLimiter becomes
- * /slower/ after a period of underutilization.
- *
- * Last, but not least: consider a RateLimiter with rate of 1 permit per second, currently
- * completely unused, and an expensive acquire(100) request comes. It would be nonsensical
- * to just wait for 100 seconds, and /then/ start the actual task. Why wait without doing
- * anything? A much better approach is to /allow/ the request right away (as if it was an
- * acquire(1) request instead), and postpone /subsequent/ requests as needed. In this version,
- * we allow starting the task immediately, and postpone by 100 seconds future requests,
- * thus we allow for work to get done in the meantime instead of waiting idly.
- *
- * This has important consequences: it means that the RateLimiter doesn't remember the time
- * of the _last_ request, but it remembers the (expected) time of the _next_ request. This
- * also enables us to tell immediately (see tryAcquire(timeout)) whether a particular
- * timeout is enough to get us to the point of the next scheduling time, since we always
- * maintain that. And what we mean by "an unused RateLimiter" is also defined by that
- * notion: when we observe that the "expected arrival time of the next request" is actually
- * in the past, then the difference (now - past) is the amount of time that the RateLimiter
- * was formally unused, and it is that amount of time which we translate to storedPermits.
- * (We increase storedPermits with the amount of permits that would have been produced
- * in that idle time). So, if rate == 1 permit per second, and arrivals come exactly
- * one second after the previous, then storedPermits is _never_ increased -- we would only
- * increase it for arrivals _later_ than the expected one second.
- */
-
- /**
- * Creates a {@code RateLimiter} with the specified stable throughput, given as
- * "permits per second" (commonly referred to as <i>QPS</i>, queries per second).
- *
- * <p>The returned {@code RateLimiter} ensures that on average no more than {@code
- * permitsPerSecond} are issued during any given second, with sustained requests
- * being smoothly spread over each second. When the incoming request rate exceeds
- * {@code permitsPerSecond} the rate limiter will release one permit every {@code
- * (1.0 / permitsPerSecond)} seconds. When the rate limiter is unused,
- * bursts of up to {@code permitsPerSecond} permits will be allowed, with subsequent
- * requests being smoothly limited at the stable rate of {@code permitsPerSecond}.
- *
- * @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in
- * how many permits become available per second.
- */
- public static RateLimiter create(double permitsPerSecond) {
- return create(SleepingTicker.SYSTEM_TICKER, permitsPerSecond);
- }
-
- @VisibleForTesting
- static RateLimiter create(SleepingTicker ticker, double permitsPerSecond) {
- RateLimiter rateLimiter = new Bursty(ticker);
- rateLimiter.setRate(permitsPerSecond);
- return rateLimiter;
- }
-
- /**
- * Creates a {@code RateLimiter} with the specified stable throughput, given as
- * "permits per second" (commonly referred to as <i>QPS</i>, queries per second), and a
- * <i>warmup period</i>, during which the {@code RateLimiter} smoothly ramps up its rate,
- * until it reaches its maximum rate at the end of the period (as long as there are enough
- * requests to saturate it). Similarly, if the {@code RateLimiter} is left <i>unused</i> for
- * a duration of {@code warmupPeriod}, it will gradually return to its "cold" state,
- * i.e. it will go through the same warming up process as when it was first created.
- *
- * <p>The returned {@code RateLimiter} is intended for cases where the resource that actually
- * fulfils the requests (e.g., a remote server) needs "warmup" time, rather than
- * being immediately accessed at the stable (maximum) rate.
- *
- * <p>The returned {@code RateLimiter} starts in a "cold" state (i.e. the warmup period
- * will follow), and if it is left unused for long enough, it will return to that state.
- *
- * @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in
- * how many permits become available per second
- * @param warmupPeriod the duration of the period where the {@code RateLimiter} ramps up its
- * rate, before reaching its stable (maximum) rate
- * @param unit the time unit of the warmupPeriod argument
- */
- // TODO(user): add a burst size of 1-second-worth of permits, as in the metronome?
- public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
- return create(SleepingTicker.SYSTEM_TICKER, permitsPerSecond, warmupPeriod, unit);
- }
-
- @VisibleForTesting
- static RateLimiter create(
- SleepingTicker ticker, double permitsPerSecond, long warmupPeriod, TimeUnit timeUnit) {
- RateLimiter rateLimiter = new WarmingUp(ticker, warmupPeriod, timeUnit);
- rateLimiter.setRate(permitsPerSecond);
- return rateLimiter;
- }
-
- @VisibleForTesting
- static RateLimiter createBursty(
- SleepingTicker ticker, double permitsPerSecond, int maxBurstSize) {
- Bursty rateLimiter = new Bursty(ticker);
- rateLimiter.setRate(permitsPerSecond);
- rateLimiter.maxPermits = maxBurstSize;
- return rateLimiter;
- }
-
- /**
- * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate
- * object to facilitate testing.
- */
- private final SleepingTicker ticker;
-
- /**
- * The timestamp when the RateLimiter was created; used to avoid possible overflow/time-wrapping
- * errors.
- */
- private final long offsetNanos;
-
- /**
- * The currently stored permits.
- */
- double storedPermits;
-
- /**
- * The maximum number of stored permits.
- */
- double maxPermits;
-
- /**
- * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
- * per second has a stable interval of 200ms.
- */
- volatile double stableIntervalMicros;
-
- private final Object mutex = new Object();
-
- /**
- * The time when the next request (no matter its size) will be granted. After granting a request,
- * this is pushed further in the future. Large requests push this further than small requests.
- */
- private long nextFreeTicketMicros = 0L; // could be either in the past or future
-
- private RateLimiter(SleepingTicker ticker) {
- this.ticker = ticker;
- this.offsetNanos = ticker.read();
- }
-
- /**
- * Updates the stable rate of this {@code RateLimiter}, that is, the
- * {@code permitsPerSecond} argument provided in the factory method that
- * constructed the {@code RateLimiter}. Currently throttled threads will <b>not</b>
- * be awakened as a result of this invocation, thus they do not observe the new rate;
- * only subsequent requests will.
- *
- * <p>Note though that, since each request repays (by waiting, if necessary) the cost
- * of the <i>previous</i> request, this means that the very next request
- * after an invocation to {@code setRate} will not be affected by the new rate;
- * it will pay the cost of the previous request, which is in terms of the previous rate.
- *
- * <p>The behavior of the {@code RateLimiter} is not modified in any other way,
- * e.g. if the {@code RateLimiter} was configured with a warmup period of 20 seconds,
- * it still has a warmup period of 20 seconds after this method invocation.
- *
- * @param permitsPerSecond the new stable rate of this {@code RateLimiter}.
- */
- public final void setRate(double permitsPerSecond) {
- Preconditions.checkArgument(permitsPerSecond > 0.0
- && !Double.isNaN(permitsPerSecond), "rate must be positive");
- synchronized (mutex) {
- resync(readSafeMicros());
- double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
- this.stableIntervalMicros = stableIntervalMicros;
- doSetRate(permitsPerSecond, stableIntervalMicros);
- }
- }
-
- abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);
-
- /**
- * Returns the stable rate (as {@code permits per seconds}) with which this
- * {@code RateLimiter} is configured with. The initial value of this is the same as
- * the {@code permitsPerSecond} argument passed in the factory method that produced
- * this {@code RateLimiter}, and it is only updated after invocations
- * to {@linkplain #setRate}.
- */
- public final double getRate() {
- return TimeUnit.SECONDS.toMicros(1L) / stableIntervalMicros;
- }
-
- /**
- * Acquires a permit from this {@code RateLimiter}, blocking until the request can be granted.
- *
- * <p>This method is equivalent to {@code acquire(1)}.
- */
- public void acquire() {
- acquire(1);
- }
-
- /**
- * Acquires the given number of permits from this {@code RateLimiter}, blocking until the
- * request be granted.
- *
- * @param permits the number of permits to acquire
- */
- public void acquire(int permits) {
- checkPermits(permits);
- long microsToWait;
- synchronized (mutex) {
- microsToWait = reserveNextTicket(permits, readSafeMicros());
- }
- ticker.sleepMicrosUninterruptibly(microsToWait);
- }
-
- /**
- * Acquires a permit from this {@code RateLimiter} if it can be obtained
- * without exceeding the specified {@code timeout}, or returns {@code false}
- * immediately (without waiting) if the permit would not have been granted
- * before the timeout expired.
- *
- * <p>This method is equivalent to {@code tryAcquire(1, timeout, unit)}.
- *
- * @param timeout the maximum time to wait for the permit
- * @param unit the time unit of the timeout argument
- * @return {@code true} if the permit was acquired, {@code false} otherwise
- */
- public boolean tryAcquire(long timeout, TimeUnit unit) {
- return tryAcquire(1, timeout, unit);
- }
-
- /**
- * Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay.
- *
- * <p>
- * This method is equivalent to {@code tryAcquire(permits, 0, anyUnit)}.
- *
- * @param permits the number of permits to acquire
- * @return {@code true} if the permits were acquired, {@code false} otherwise
- * @since 14.0
- */
- public boolean tryAcquire(int permits) {
- return tryAcquire(permits, 0, TimeUnit.MICROSECONDS);
- }
-
- /**
- * Acquires a permit from this {@link RateLimiter} if it can be acquired immediately without
- * delay.
- *
- * <p>
- * This method is equivalent to {@code tryAcquire(1)}.
- *
- * @return {@code true} if the permit was acquired, {@code false} otherwise
- * @since 14.0
- */
- public boolean tryAcquire() {
- return tryAcquire(1, 0, TimeUnit.MICROSECONDS);
- }
-
- /**
- * Acquires the given number of permits from this {@code RateLimiter} if it can be obtained
- * without exceeding the specified {@code timeout}, or returns {@code false}
- * immediately (without waiting) if the permits would not have been granted
- * before the timeout expired.
- *
- * @param permits the number of permits to acquire
- * @param timeout the maximum time to wait for the permits
- * @param unit the time unit of the timeout argument
- * @return {@code true} if the permits were acquired, {@code false} otherwise
- */
- public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
- long timeoutMicros = unit.toMicros(timeout);
- checkPermits(permits);
- long microsToWait;
- synchronized (mutex) {
- long nowMicros = readSafeMicros();
- if (nextFreeTicketMicros > nowMicros + timeoutMicros) {
- return false;
- } else {
- microsToWait = reserveNextTicket(permits, nowMicros);
- }
- }
- ticker.sleepMicrosUninterruptibly(microsToWait);
- return true;
- }
-
- private static void checkPermits(int permits) {
- Preconditions.checkArgument(permits > 0, "Requested permits must be positive");
- }
-
- /**
- * Reserves next ticket and returns the wait time that the caller must wait for.
- */
- private long reserveNextTicket(double requiredPermits, long nowMicros) {
- resync(nowMicros);
- long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
- double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits);
- double freshPermits = requiredPermits - storedPermitsToSpend;
-
- long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
- + (long) (freshPermits * stableIntervalMicros);
-
- this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
- this.storedPermits -= storedPermitsToSpend;
- return microsToNextFreeTicket;
- }
-
- /**
- * Translates a specified portion of our currently stored permits which we want to
- * spend/acquire, into a throttling time. Conceptually, this evaluates the integral
- * of the underlying function we use, for the range of
- * [(storedPermits - permitsToTake), storedPermits].
- *
- * This always holds: {@code 0 <= permitsToTake <= storedPermits}
- */
- abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
-
- private void resync(long nowMicros) {
- // if nextFreeTicket is in the past, resync to now
- if (nowMicros > nextFreeTicketMicros) {
- storedPermits = Math.min(maxPermits,
- storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
- nextFreeTicketMicros = nowMicros;
- }
- }
-
- private long readSafeMicros() {
- return TimeUnit.NANOSECONDS.toMicros(ticker.read() - offsetNanos);
- }
-
- @Override
- public String toString() {
- return String.format("RateLimiter[stableRate=%3.1fqps]", 1000000.0 / stableIntervalMicros);
- }
-
- /**
- * This implements the following function:
- *
- * ^ throttling
- * |
- * 3*stable + /
- * interval | /.
- * (cold) | / .
- * | / . <-- "warmup period" is the area of the trapezoid between
- * 2*stable + / . halfPermits and maxPermits
- * interval | / .
- * | / .
- * | / .
- * stable +----------/ WARM . }
- * interval | . UP . } <-- this rectangle (from 0 to maxPermits, and
- * | . PERIOD. } height == stableInterval) defines the cooldown period,
- * | . . } and we want cooldownPeriod == warmupPeriod
- * |---------------------------------> storedPermits
- * (halfPermits) (maxPermits)
- *
- * Before going into the details of this particular function, let's keep in mind the basics:
- * 1) The state of the RateLimiter (storedPermits) is a vertical line in this figure.
- * 2) When the RateLimiter is not used, this goes right (up to maxPermits)
- * 3) When the RateLimiter is used, this goes left (down to zero), since if we have storedPermits,
- * we serve from those first
- * 4) When _unused_, we go right at the same speed (rate)! I.e., if our rate is
- * 2 permits per second, and 3 unused seconds pass, we will always save 6 permits
- * (no matter what our initial position was), up to maxPermits.
- * If we invert the rate, we get the "stableInterval" (interval between two requests
- * in a perfectly spaced out sequence of requests of the given rate). Thus, if you
- * want to see "how much time it will take to go from X storedPermits to X+K storedPermits?",
- * the answer is always stableInterval * K. In the same example, for 2 permits per second,
- * stableInterval is 500ms. Thus to go from X storedPermits to X+6 storedPermits, we
- * require 6 * 500ms = 3 seconds.
- *
- * In short, the time it takes to move to the right (save K permits) is equal to the
- * rectangle of width == K and height == stableInterval.
- * 4) When _used_, the time it takes, as explained in the introductory class note, is
- * equal to the integral of our function, between X permits and X-K permits, assuming
- * we want to spend K saved permits.
- *
- * In summary, the time it takes to move to the left (spend K permits), is equal to the
- * area of the function of width == K.
- *
- * Let's dive into this function now:
- *
- * When we have storedPermits <= halfPermits (the left portion of the function), then
- * we spend them at the exact same rate that
- * fresh permits would be generated anyway (that rate is 1/stableInterval). We size
- * this area to be equal to _half_ the specified warmup period. Why we need this?
- * And why half? We'll explain shortly below (after explaining the second part).
- *
- * Stored permits that are beyond halfPermits, are mapped to an ascending line, that goes
- * from stableInterval to 3 * stableInterval. The average height for that part is
- * 2 * stableInterval, and is sized appropriately to have an area _equal_ to the
- * specified warmup period. Thus, by point (4) above, it takes "warmupPeriod" amount of time
- * to go from maxPermits to halfPermits.
- *
- * BUT, by point (3) above, it only takes "warmupPeriod / 2" amount of time to return back
- * to maxPermits, from halfPermits! (Because the trapezoid has double the area of the rectangle
- * of height stableInterval and equivalent width). We decided that the "cooldown period"
- * time should be equivalent to "warmup period", thus a fully saturated RateLimiter
- * (with zero stored permits, serving only fresh ones) can go to a fully unsaturated
- * (with storedPermits == maxPermits) in the same amount of time it takes for a fully
- * unsaturated RateLimiter to return to the stableInterval -- which happens in halfPermits,
- * since beyond that point, we use a horizontal line of "stableInterval" height, simulating
- * the regular rate.
- *
- * Thus, we have figured all dimensions of this shape, to give all the desired
- * properties:
- * - the width is warmupPeriod / stableInterval, to make cooldownPeriod == warmupPeriod
- * - the slope starts at the middle, and goes from stableInterval to 3*stableInterval so
- * to have halfPermits being spend in double the usual time (half the rate), while their
- * respective rate is steadily ramping up
- */
- private static class WarmingUp extends RateLimiter {
-
- final long warmupPeriodMicros;
- /**
- * The slope of the line from the stable interval (when permits == 0), to the cold interval
- * (when permits == maxPermits)
- */
- private double slope;
- private double halfPermits;
-
- WarmingUp(SleepingTicker ticker, long warmupPeriod, TimeUnit timeUnit) {
- super(ticker);
- this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
- }
-
- @Override
- void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
- double oldMaxPermits = maxPermits;
- maxPermits = warmupPeriodMicros / stableIntervalMicros;
- halfPermits = maxPermits / 2.0;
- // Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate
- double coldIntervalMicros = stableIntervalMicros * 3.0;
- slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
- if (oldMaxPermits == Double.POSITIVE_INFINITY) {
- // if we don't special-case this, we would get storedPermits == NaN, below
- storedPermits = 0.0;
- } else {
- storedPermits = (oldMaxPermits == 0.0)
- ? maxPermits // initial state is cold
- : storedPermits * maxPermits / oldMaxPermits;
- }
- }
-
- @Override
- long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
- double availablePermitsAboveHalf = storedPermits - halfPermits;
- long micros = 0;
- // measuring the integral on the right part of the function (the climbing line)
- if (availablePermitsAboveHalf > 0.0) {
- double permitsAboveHalfToTake = Math.min(availablePermitsAboveHalf, permitsToTake);
- micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
- + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
- permitsToTake -= permitsAboveHalfToTake;
- }
- // measuring the integral on the left part of the function (the horizontal line)
- micros += (stableIntervalMicros * permitsToTake);
- return micros;
- }
-
- private double permitsToTime(double permits) {
- return stableIntervalMicros + permits * slope;
- }
- }
-
- /**
- * This implements a trivial function, where storedPermits are translated to
- * zero throttling - thus, a client gets an infinite speedup for permits acquired out
- * of the storedPermits pool. This is also used for the special case of the "metronome",
- * where the width of the function is also zero; maxStoredPermits is zero, thus
- * storedPermits and permitsToTake are always zero as well. Such a RateLimiter can
- * not save permits when unused, thus all permits it serves are fresh, using the
- * designated rate.
- */
- private static class Bursty extends RateLimiter {
- Bursty(SleepingTicker ticker) {
- super(ticker);
- }
-
- @Override
- void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
- double oldMaxPermits = this.maxPermits;
- /*
- * We allow the equivalent work of up to one second to be granted with zero waiting, if the
- * rate limiter has been unused for as much. This is to avoid potentially producing tiny
- * wait interval between subsequent requests for sufficiently large rates, which would
- * unnecessarily overconstrain the thread scheduler.
- */
- maxPermits = permitsPerSecond; // one second worth of permits
- storedPermits = (oldMaxPermits == 0.0)
- ? 0.0 // initial state
- : storedPermits * maxPermits / oldMaxPermits;
- }
-
- @Override
- long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
- return 0L;
- }
- }
-
- @VisibleForTesting
- static abstract class SleepingTicker extends Ticker {
- abstract void sleepMicrosUninterruptibly(long micros);
-
- static final SleepingTicker SYSTEM_TICKER = new SleepingTicker() {
- @Override
- public long read() {
- return systemTicker().read();
- }
-
- @Override
- public void sleepMicrosUninterruptibly(long micros) {
- if (micros > 0) {
- Uninterruptibles.sleepUninterruptibly(micros, TimeUnit.MICROSECONDS);
- }
- }
- };
- }
-}
diff --git a/guava/src/com/google/common/util/concurrent/Service.java b/guava/src/com/google/common/util/concurrent/Service.java
index 861164e..9ad1f3d 100644
--- a/guava/src/com/google/common/util/concurrent/Service.java
+++ b/guava/src/com/google/common/util/concurrent/Service.java
@@ -19,58 +19,57 @@ package com.google.common.util.concurrent;
import com.google.common.annotations.Beta;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
/**
- * An object with an operational state, plus asynchronous {@link #start()} and {@link #stop()}
- * lifecycle methods to transition between states. Example services include webservers, RPC servers
- * and timers.
- *
- * <p>The normal lifecycle of a service is:
+ * An object with an operational state, plus asynchronous {@link #start()} and
+ * {@link #stop()} lifecycle methods to transfer into and out of this state.
+ * Example services include webservers, RPC servers and timers. The normal
+ * lifecycle of a service is:
* <ul>
- * <li>{@linkplain State#NEW NEW} -&gt;
- * <li>{@linkplain State#STARTING STARTING} -&gt;
- * <li>{@linkplain State#RUNNING RUNNING} -&gt;
- * <li>{@linkplain State#STOPPING STOPPING} -&gt;
- * <li>{@linkplain State#TERMINATED TERMINATED}
+ * <li>{@link State#NEW} -&gt;</li>
+ * <li>{@link State#STARTING} -&gt;</li>
+ * <li>{@link State#RUNNING} -&gt;</li>
+ * <li>{@link State#STOPPING} -&gt;</li>
+ * <li>{@link State#TERMINATED}</li>
* </ul>
*
- * <p>There are deviations from this if there are failures or if {@link Service#stop} is called
- * before the {@link Service} reaches the {@linkplain State#RUNNING RUNNING} state. The set of legal
- * transitions form a <a href="http://en.wikipedia.org/wiki/Directed_acyclic_graph">DAG</a>,
- * therefore every method of the listener will be called at most once. N.B. The {@link State#FAILED}
- * and {@link State#TERMINATED} states are terminal states, once a service enters either of these
- * states it cannot ever leave them.
+ * If the service fails while starting, running or stopping, its state will be
+ * {@link State#FAILED}, and its behavior is undefined. Such a service cannot be
+ * started nor stopped.
*
- * <p>Implementors of this interface are strongly encouraged to extend one of the abstract classes
- * in this package which implement this interface and make the threading and state management
- * easier.
+ * <p>Implementors of this interface are strongly encouraged to extend one of
+ * the abstract classes in this package which implement this interface and
+ * make the threading and state management easier.
*
* @author Jesse Wilson
- * @author Luke Sandberg
- * @since 9.0 (in 1.0 as {@code com.google.common.base.Service})
+ * @since 9.0 (in 1.0 as
+ * {@code com.google.common.base.Service})
*/
-@Beta
+@Beta // TODO(kevinb): make abstract class?
public interface Service {
/**
- * If the service state is {@link State#NEW}, this initiates service startup and returns
- * immediately. If the service has already been started, this method returns immediately without
- * taking action. A stopped service may not be restarted.
+ * If the service state is {@link State#NEW}, this initiates service startup
+ * and returns immediately. If the service has already been started, this
+ * method returns immediately without taking action. A stopped service may not
+ * be restarted.
*
- * @return a future for the startup result, regardless of whether this call initiated startup.
- * Calling {@link ListenableFuture#get} will block until the service has finished
- * starting, and returns one of {@link State#RUNNING}, {@link State#STOPPING} or
- * {@link State#TERMINATED}. If the service fails to start, {@link ListenableFuture#get}
- * will throw an {@link ExecutionException}, and the service's state will be
- * {@link State#FAILED}. If it has already finished starting, {@link ListenableFuture#get}
- * returns immediately. Cancelling this future has no effect on the service.
+ * @return a future for the startup result, regardless of whether this call
+ * initiated startup. Calling {@link ListenableFuture#get} will block
+ * until the service has finished starting, and returns one of {@link
+ * State#RUNNING}, {@link State#STOPPING} or {@link State#TERMINATED}. If
+ * the service fails to start, {@link ListenableFuture#get} will throw an
+ * {@link ExecutionException}, and the service's state will be {@link
+ * State#FAILED}. If it has already finished starting, {@link
+ * ListenableFuture#get} returns immediately. Cancelling this future has
+ * no effect on the service.
*/
ListenableFuture<State> start();
/**
- * Initiates service startup (if necessary), returning once the service has finished starting.
- * Unlike calling {@code start().get()}, this method throws no checked exceptions, and it cannot
- * be {@linkplain Thread#interrupt interrupted}.
+ * Initiates service startup (if necessary), returning once the service has
+ * finished starting. Unlike calling {@code start().get()}, this method throws
+ * no checked exceptions, and it cannot be {@linkplain Thread#interrupt
+ * interrupted}.
*
* @throws UncheckedExecutionException if startup failed
* @return the state of the service when startup finished.
@@ -88,67 +87,39 @@ public interface Service {
State state();
/**
- * If the service is {@linkplain State#STARTING starting} or {@linkplain State#RUNNING running},
- * this initiates service shutdown and returns immediately. If the service is
- * {@linkplain State#NEW new}, it is {@linkplain State#TERMINATED terminated} without having been
- * started nor stopped. If the service has already been stopped, this method returns immediately
- * without taking action.
+ * If the service is {@linkplain State#STARTING starting} or {@linkplain
+ * State#RUNNING running}, this initiates service shutdown and returns
+ * immediately. If the service is {@linkplain State#NEW new}, it is
+ * {@linkplain State#TERMINATED terminated} without having been started nor
+ * stopped. If the service has already been stopped, this method returns
+ * immediately without taking action.
*
- * @return a future for the shutdown result, regardless of whether this call initiated shutdown.
- * Calling {@link ListenableFuture#get} will block until the service has finished shutting
- * down, and either returns {@link State#TERMINATED} or throws an
- * {@link ExecutionException}. If it has already finished stopping,
- * {@link ListenableFuture#get} returns immediately. Cancelling this future has no effect
- * on the service.
+ * @return a future for the shutdown result, regardless of whether this call
+ * initiated shutdown. Calling {@link ListenableFuture#get} will block
+ * until the service has finished shutting down, and either returns
+ * {@link State#TERMINATED} or throws an {@link ExecutionException}. If
+ * it has already finished stopping, {@link ListenableFuture#get} returns
+ * immediately. Cancelling this future has no effect on the service.
*/
ListenableFuture<State> stop();
/**
- * Initiates service shutdown (if necessary), returning once the service has finished stopping. If
- * this is {@link State#STARTING}, startup will be cancelled. If this is {@link State#NEW}, it is
- * {@link State#TERMINATED terminated} without having been started nor stopped. Unlike calling
- * {@code stop().get()}, this method throws no checked exceptions.
+ * Initiates service shutdown (if necessary), returning once the service has
+ * finished stopping. If this is {@link State#STARTING}, startup will be
+ * cancelled. If this is {@link State#NEW}, it is {@link State#TERMINATED
+ * terminated} without having been started nor stopped. Unlike calling {@code
+ * stop().get()}, this method throws no checked exceptions.
*
- * @throws UncheckedExecutionException if the service has failed or fails during shutdown
+ * @throws UncheckedExecutionException if shutdown failed
* @return the state of the service when shutdown finished.
*/
State stopAndWait();
/**
- * Returns the {@link Throwable} that caused this service to fail.
- *
- * @throws IllegalStateException if this service's state isn't {@linkplain State#FAILED FAILED}.
- *
- * @since 14.0
- */
- Throwable failureCause();
-
- /**
- * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given
- * executor. The listener will have the corresponding transition method called whenever the
- * service changes state. The listener will not have previous state changes replayed, so it is
- * suggested that listeners are added before the service starts.
- *
- * <p>There is no guaranteed ordering of execution of listeners, but any listener added through
- * this method is guaranteed to be called whenever there is a state change.
- *
- * <p>Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown
- * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an exception
- * thrown by {@linkplain MoreExecutors#sameThreadExecutor inline execution}) will be caught and
- * logged.
- *
- * @param listener the listener to run when the service changes state is complete
- * @param executor the executor in which the the listeners callback methods will be run. For fast,
- * lightweight listeners that would be safe to execute in any thread, consider
- * {@link MoreExecutors#sameThreadExecutor}.
- * @since 13.0
- */
- void addListener(Listener listener, Executor executor);
-
- /**
* The lifecycle states of a service.
*
- * @since 9.0 (in 1.0 as {@code com.google.common.base.Service.State})
+ * @since 9.0 (in 1.0 as
+ * {@code com.google.common.base.Service.State})
*/
@Beta // should come out of Beta when Service does
enum State {
@@ -174,70 +145,15 @@ public interface Service {
STOPPING,
/**
- * A service in this state has completed execution normally. It does minimal work and consumes
- * minimal resources.
+ * A service in this state has completed execution normally. It does minimal
+ * work and consumes minimal resources.
*/
TERMINATED,
/**
- * A service in this state has encountered a problem and may not be operational. It cannot be
- * started nor stopped.
+ * A service in this state has encountered a problem and may not be
+ * operational. It cannot be started nor stopped.
*/
FAILED
}
-
- /**
- * A listener for the various state changes that a {@link Service} goes through in its lifecycle.
- *
- * @author Luke Sandberg
- * @since 13.0
- */
- @Beta // should come out of Beta when Service does
- interface Listener {
- /**
- * Called when the service transitions from {@linkplain State#NEW NEW} to
- * {@linkplain State#STARTING STARTING}. This occurs when {@link Service#start} or
- * {@link Service#startAndWait} is called the first time.
- */
- void starting();
-
- /**
- * Called when the service transitions from {@linkplain State#STARTING STARTING} to
- * {@linkplain State#RUNNING RUNNING}. This occurs when a service has successfully started.
- */
- void running();
-
- /**
- * Called when the service transitions to the {@linkplain State#STOPPING STOPPING} state. The
- * only valid values for {@code from} are {@linkplain State#STARTING STARTING} or
- * {@linkplain State#RUNNING RUNNING}. This occurs when {@link Service#stop} is called.
- *
- * @param from The previous state that is being transitioned from.
- */
- void stopping(State from);
-
- /**
- * Called when the service transitions to the {@linkplain State#TERMINATED TERMINATED} state.
- * The {@linkplain State#TERMINATED TERMINATED} state is a terminal state in the transition
- * diagram. Therefore, if this method is called, no other methods will be called on the
- * {@link Listener}.
- *
- * @param from The previous state that is being transitioned from. The only valid values for
- * this are {@linkplain State#NEW NEW}, {@linkplain State#RUNNING RUNNING} or
- * {@linkplain State#STOPPING STOPPING}.
- */
- void terminated(State from);
-
- /**
- * Called when the service transitions to the {@linkplain State#FAILED FAILED} state. The
- * {@linkplain State#FAILED FAILED} state is a terminal state in the transition diagram.
- * Therefore, if this method is called, no other methods will be called on the {@link Listener}.
- *
- * @param from The previous state that is being transitioned from. Failure can occur in any
- * state with the exception of {@linkplain State#NEW NEW} or
- * {@linkplain State#TERMINATED TERMINATED}.
- * @param failure The exception that caused the failure.
- */
- void failed(State from, Throwable failure);
- }
}
diff --git a/guava/src/com/google/common/util/concurrent/ServiceManager.java b/guava/src/com/google/common/util/concurrent/ServiceManager.java
deleted file mode 100644
index c779b23..0000000
--- a/guava/src/com/google/common/util/concurrent/ServiceManager.java
+++ /dev/null
@@ -1,724 +0,0 @@
-/*
- * Copyright (C) 2012 The Guava Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.common.util.concurrent;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.Service.State;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.Immutable;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-/**
- * A manager for monitoring and controlling a set of {@link Service services}. This class provides
- * methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and
- * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}.
- * Additionally, users can monitor state transitions with the {@link Listener listener} mechanism.
- *
- * <p>While it is recommended that service lifecycles be managed via this class, state transitions
- * initiated via other mechanisms do not impact the correctness of its methods. For example, if the
- * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked
- * when appropriate and {@link #awaitHealthy} will still work as expected.
- *
- * <p>Here is a simple example of how to use a {@link ServiceManager} to start a server.
- * <pre> {@code
- * class Server {
- * public static void main(String[] args) {
- * Set<Service> services = ...;
- * ServiceManager manager = new ServiceManager(services);
- * manager.addListener(new Listener() {
- * public void stopped() {}
- * public void healthy() {
- * // Services have been initialized and are healthy, start accepting requests...
- * }
- * public void failure(Service service) {
- * // Something failed, at this point we could log it, notify a load balancer, or take
- * // some other action. For now we will just exit.
- * System.exit(1);
- * }
- * },
- * MoreExecutors.sameThreadExecutor());
- *
- * Runtime.getRuntime().addShutdownHook(new Thread() {
- * public void run() {
- * // Give the services 5 seconds to stop to ensure that we are responsive to shutdown
- * // requests.
- * try {
- * manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
- * } catch (TimeoutException timeout) {
- * // stopping timed out
- * }
- * }
- * });
- * manager.startAsync(); // start all the services asynchronously
- * }
- * }}</pre>
- *
- * This class uses the ServiceManager's methods to start all of its services, to respond to service
- * failure and to ensure that when the JVM is shutting down all the services are stopped.
- *
- * @author Luke Sandberg
- * @since 14.0
- */
-@Beta
-@Singleton
-public final class ServiceManager {
- private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
-
- /**
- * A listener for the aggregate state changes of the services that are under management. Users
- * that need to listen to more fine-grained events (such as when each particular
- * {@link Service service} starts, or terminates), should attach {@link Service.Listener service
- * listeners} to each individual service.
- *
- * @author Luke Sandberg
- * @since 14.0
- */
- @Beta // Should come out of Beta when ServiceManager does
- public static interface Listener {
- /**
- * Called when the service initially becomes healthy.
- *
- * <p>This will be called at most once after all the services have entered the
- * {@linkplain State#RUNNING running} state. If any services fail during start up or
- * {@linkplain State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other
- * services have started {@linkplain State#RUNNING running} then this method will not be called.
- */
- void healthy();
-
- /**
- * Called when the all of the component services have reached a terminal state, either
- * {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}.
- */
- void stopped();
-
- /**
- * Called when a component service has {@linkplain State#FAILED failed}.
- *
- * @param service The service that failed.
- */
- void failure(Service service);
- }
-
- /**
- * An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener
- * service listeners}. This is extracted into its own object so that {@link ServiceListener}
- * could be made {@code static} and its instances can be safely constructed and added in the
- * {@link ServiceManager} constructor without having to close over the partially constructed
- * {@link ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}).
- */
- private final ServiceManagerState state;
- private final ImmutableMap<Service, ServiceListener> services;
-
- /**
- * Constructs a new instance for managing the given services.
- *
- * @param services The services to manage
- *
- * @throws IllegalArgumentException if not all services are {@link State#NEW new} or if there are
- * any duplicate services.
- */
- public ServiceManager(Iterable<? extends Service> services) {
- ImmutableList<Service> copy = ImmutableList.copyOf(services);
- this.state = new ServiceManagerState(copy.size());
- ImmutableMap.Builder<Service, ServiceListener> builder = ImmutableMap.builder();
- Executor executor = MoreExecutors.sameThreadExecutor();
- for (Service service : copy) {
- ServiceListener listener = new ServiceListener(service, state);
- service.addListener(listener, executor);
- // We check the state after adding the listener as a way to ensure that our listener was added
- // to a NEW service.
- checkArgument(service.state() == State.NEW, "Can only manage NEW services, %s", service);
- builder.put(service, listener);
- }
- this.services = builder.build();
- }
-
- /**
- * Constructs a new instance for managing the given services. This constructor is provided so that
- * dependency injection frameworks can inject instances of {@link ServiceManager}.
- *
- * @param services The services to manage
- *
- * @throws IllegalStateException if not all services are {@link State#NEW new}.
- */
- @Inject ServiceManager(Set<Service> services) {
- this((Iterable<Service>) services);
- }
-
- /**
- * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given
- * executor. The listener will not have previous state changes replayed, so it is
- * suggested that listeners are added before any of the managed services are
- * {@linkplain Service#start started}.
- *
- * <p>There is no guaranteed ordering of execution of listeners, but any listener added through
- * this method is guaranteed to be called whenever there is a state change.
- *
- * <p>Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown
- * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an exception
- * thrown by {@linkplain MoreExecutors#sameThreadExecutor inline execution}) will be caught and
- * logged.
- *
- * @param listener the listener to run when the manager changes state
- * @param executor the executor in which the the listeners callback methods will be run. For fast,
- * lightweight listeners that would be safe to execute in any thread, consider
- * {@link MoreExecutors#sameThreadExecutor}.
- */
- public void addListener(Listener listener, Executor executor) {
- state.addListener(listener, executor);
- }
-
- /**
- * Initiates service {@linkplain Service#start startup} on all the services being managed. It is
- * only valid to call this method if all of the services are {@linkplain State#NEW new}.
- *
- * @return this
- * @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the
- * method is called, {@link State#TERMINATED terminated} or {@link State#FAILED failed}.
- */
- public ServiceManager startAsync() {
- for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) {
- Service service = entry.getKey();
- State state = service.state();
- checkState(state == State.NEW, "Service %s is %s, cannot start it.", service,
- state);
- }
- for (ServiceListener service : services.values()) {
- service.start();
- }
- return this;
- }
-
- /**
- * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}. The manager
- * will become healthy after all the component services have reached the {@linkplain State#RUNNING
- * running} state.
- *
- * @throws IllegalStateException if the service manager reaches a state from which it cannot
- * become {@linkplain #isHealthy() healthy}.
- */
- public void awaitHealthy() {
- state.awaitHealthy();
- checkState(isHealthy(), "Expected to be healthy after starting");
- }
-
- /**
- * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more
- * than the given time. The manager will become healthy after all the component services have
- * reached the {@linkplain State#RUNNING running} state.
- *
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- * @throws TimeoutException if not all of the services have finished starting within the deadline
- * @throws IllegalStateException if the service manager reaches a state from which it cannot
- * become {@linkplain #isHealthy() healthy}.
- */
- public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
- if (!state.awaitHealthy(timeout, unit)) {
- // It would be nice to tell the caller who we are still waiting on, and this information is
- // likely to be in servicesByState(), however due to race conditions we can't actually tell
- // which services are holding up healthiness. The current set of NEW or STARTING services is
- // likely to point out the culprit, but may not. If we really wanted to solve this we could
- // change state to track exactly which services have started and then we could accurately
- // report on this. But it is only for logging so we likely don't care.
- throw new TimeoutException("Timeout waiting for the services to become healthy.");
- }
- checkState(isHealthy(), "Expected to be healthy after starting");
- }
-
- /**
- * Initiates service {@linkplain Service#stop shutdown} if necessary on all the services being
- * managed.
- *
- * @return this
- */
- public ServiceManager stopAsync() {
- for (Service service : services.keySet()) {
- service.stop();
- }
- return this;
- }
-
- /**
- * Waits for the all the services to reach a terminal state. After this method returns all
- * services will either be {@link Service.State#TERMINATED terminated} or
- * {@link Service.State#FAILED failed}
- */
- public void awaitStopped() {
- state.awaitStopped();
- }
-
- /**
- * Waits for the all the services to reach a terminal state for no more than the given time. After
- * this method returns all services will either be {@link Service.State#TERMINATED terminated} or
- * {@link Service.State#FAILED failed}
- *
- * @param timeout the maximum time to wait
- * @param unit the time unit of the timeout argument
- * @throws TimeoutException if not all of the services have stopped within the deadline
- */
- public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
- if (!state.awaitStopped(timeout, unit)) {
- throw new TimeoutException("Timeout waiting for the services to stop.");
- }
- }
-
- /**
- * Returns true if all services are currently in the {@linkplain State#RUNNING running} state.
- *
- * <p>Users who want more detailed information should use the {@link #servicesByState} method to
- * get detailed information about which services are not running.
- */
- public boolean isHealthy() {
- for (Service service : services.keySet()) {
- if (!service.isRunning()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Provides a snapshot of the current state of all the services under management.
- *
- * <p>N.B. This snapshot it not guaranteed to be consistent, i.e. the set of states returned may
- * not correspond to any particular point in time view of the services.
- */
- public ImmutableMultimap<State, Service> servicesByState() {
- ImmutableMultimap.Builder<State, Service> builder = ImmutableMultimap.builder();
- for (Service service : services.keySet()) {
- builder.put(service.state(), service);
- }
- return builder.build();
- }
-
- /**
- * Returns the service load times. This value will only return startup times for services that
- * have finished starting.
- *
- * @return Map of services and their corresponding startup time in millis, the map entries will be
- * ordered by startup time.
- */
- public ImmutableMap<Service, Long> startupTimes() {
- Map<Service, Long> loadTimeMap = Maps.newHashMapWithExpectedSize(services.size());
- for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) {
- State state = entry.getKey().state();
- if (state != State.NEW && state != State.STARTING) {
- loadTimeMap.put(entry.getKey(), entry.getValue().startupTimeMillis());
- }
- }
- List<Entry<Service, Long>> servicesByStartTime = Ordering.<Long>natural()
- .onResultOf(new Function<Map.Entry<Service, Long>, Long>() {
- @Override public Long apply(Map.Entry<Service, Long> input) {
- return input.getValue();
- }
- })
- .sortedCopy(loadTimeMap.entrySet());
- ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder();
- for (Map.Entry<Service, Long> entry : servicesByStartTime) {
- builder.put(entry);
- }
- return builder.build();
- }
-
- @Override public String toString() {
- return Objects.toStringHelper(ServiceManager.class)
- .add("services", services.keySet())
- .toString();
- }
-
- /**
- * An encapsulation of all the mutable state of the {@link ServiceManager} that needs to be
- * accessed by instances of {@link ServiceListener}.
- */
- private static final class ServiceManagerState {
- final Monitor monitor = new Monitor();
- final int numberOfServices;
- /** The number of services that have not finished starting up. */
- @GuardedBy("monitor")
- int unstartedServices;
- /** The number of services that have not reached a terminal state. */
- @GuardedBy("monitor")
- int unstoppedServices;
- /**
- * Controls how long to wait for all the service manager to either become healthy or reach a
- * state where it is guaranteed that it can never become healthy.
- */
- final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) {
- @Override public boolean isSatisfied() {
- // All services have started or some service has terminated/failed.
- return unstartedServices == 0 || unstoppedServices != numberOfServices;
- }
- };
- /**
- * Controls how long to wait for all services to reach a terminal state.
- */
- final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) {
- @Override public boolean isSatisfied() {
- return unstoppedServices == 0;
- }
- };
- /** The listeners to notify during a state transition. */
- @GuardedBy("monitor")
- final List<ListenerExecutorPair> listeners = Lists.newArrayList();
- /**
- * The queue of listeners that are waiting to be executed.
- *
- * <p>Enqueue operations should be protected by {@link #monitor} while dequeue operations should
- * be protected by the implicit lock on this object. This is to ensure that listeners are
- * executed in the correct order and also so that a listener can not hold the {@link #monitor}
- * for an arbitrary amount of time (listeners can only block other listeners, not internal state
- * transitions). We use a concurrent queue implementation so that enqueues can be executed
- * concurrently with dequeues.
- */
- @GuardedBy("queuedListeners")
- final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
-
- ServiceManagerState(int numberOfServices) {
- this.numberOfServices = numberOfServices;
- this.unstoppedServices = numberOfServices;
- this.unstartedServices = numberOfServices;
- }
-
- void addListener(Listener listener, Executor executor) {
- checkNotNull(listener, "listener");
- checkNotNull(executor, "executor");
- monitor.enter();
- try {
- // no point in adding a listener that will never be called
- if (unstartedServices > 0 || unstoppedServices > 0) {
- listeners.add(new ListenerExecutorPair(listener, executor));
- }
- } finally {
- monitor.leave();
- }
- }
-
- void awaitHealthy() {
- monitor.enter();
- try {
- monitor.waitForUninterruptibly(awaitHealthGuard);
- } finally {
- monitor.leave();
- }
- }
-
- boolean awaitHealthy(long timeout, TimeUnit unit) {
- monitor.enter();
- try {
- if (monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) {
- return true;
- }
- return false;
- } finally {
- monitor.leave();
- }
- }
-
- void awaitStopped() {
- monitor.enter();
- try {
- monitor.waitForUninterruptibly(stoppedGuard);
- } finally {
- monitor.leave();
- }
- }
-
- boolean awaitStopped(long timeout, TimeUnit unit) {
- monitor.enter();
- try {
- return monitor.waitForUninterruptibly(stoppedGuard, timeout, unit);
- } finally {
- monitor.leave();
- }
- }
-
- /**
- * This should be called when a service finishes starting up.
- *
- * @param currentlyHealthy whether or not the service that finished starting was healthy at the
- * time that it finished starting.
- */
- @GuardedBy("monitor")
- private void serviceFinishedStarting(Service service, boolean currentlyHealthy) {
- checkState(unstartedServices > 0,
- "All services should have already finished starting but %s just finished.", service);
- unstartedServices--;
- if (currentlyHealthy && unstartedServices == 0 && unstoppedServices == numberOfServices) {
- // This means that the manager is currently healthy, or at least it should have been
- // healthy at some point from some perspective. Calling isHealthy is not currently
- // guaranteed to return true because any service could fail right now. However, the
- // happens-before relationship enforced by the monitor ensures that this method was called
- // before either serviceTerminated or serviceFailed, so we know that the manager was at
- // least healthy for some period of time. Furthermore we are guaranteed that this call to
- // healthy() will be before any call to terminated() or failure(Service) on the listener.
- // So it is correct to execute the listener's health() callback.
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.healthy();
- }
- });
- }
- });
- }
- }
- }
-
- /**
- * This should be called when a service is {@linkplain State#TERMINATED terminated}.
- */
- @GuardedBy("monitor")
- private void serviceTerminated(Service service) {
- serviceStopped(service);
- }
-
- /**
- * This should be called when a service is {@linkplain State#FAILED failed}.
- */
- @GuardedBy("monitor")
- private void serviceFailed(final Service service) {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.failure(service);
- }
- });
- }
- });
- }
- serviceStopped(service);
- }
-
- /**
- * Should be called whenever a service reaches a terminal state (
- * {@linkplain State#TERMINATED terminated} or
- * {@linkplain State#FAILED failed}).
- */
- @GuardedBy("monitor")
- private void serviceStopped(Service service) {
- checkState(unstoppedServices > 0,
- "All services should have already stopped but %s just stopped.", service);
- unstoppedServices--;
- if (unstoppedServices == 0) {
- checkState(unstartedServices == 0,
- "All services are stopped but %d services haven't finished starting",
- unstartedServices);
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.stopped();
- }
- });
- }
- });
- }
- // no more listeners could possibly be called, so clear them out
- listeners.clear();
- }
- }
-
- /**
- * Attempts to execute all the listeners in {@link #queuedListeners}.
- */
- private void executeListeners() {
- checkState(!monitor.isOccupiedByCurrentThread(),
- "It is incorrect to execute listeners with the monitor held.");
- synchronized (queuedListeners) {
- Runnable listener;
- while ((listener = queuedListeners.poll()) != null) {
- listener.run();
- }
- }
- }
- }
-
- /**
- * A {@link Service} that wraps another service and times how long it takes for it to start and
- * also calls the {@link ServiceManagerState#serviceFinishedStarting},
- * {@link ServiceManagerState#serviceTerminated} and {@link ServiceManagerState#serviceFailed}
- * according to its current state.
- */
- private static final class ServiceListener implements Service.Listener {
- @GuardedBy("watch") // AFAICT Stopwatch is not thread safe so we need to protect accesses
- final Stopwatch watch = new Stopwatch();
- final Service service;
- final ServiceManagerState state;
-
- /**
- * @param service the service that
- */
- ServiceListener(Service service, ServiceManagerState state) {
- this.service = service;
- this.state = state;
- }
-
- @Override public void starting() {
- // This can happen if someone besides the ServiceManager starts the service, in this case
- // our timings may be inaccurate.
- startTimer();
- }
-
- @Override public void running() {
- state.monitor.enter();
- try {
- finishedStarting(true);
- } finally {
- state.monitor.leave();
- state.executeListeners();
- }
- }
-
- @Override public void stopping(State from) {
- if (from == State.STARTING) {
- state.monitor.enter();
- try {
- finishedStarting(false);
- } finally {
- state.monitor.leave();
- state.executeListeners();
- }
- }
- }
-
- @Override public void terminated(State from) {
- logger.info("Service " + service + " has terminated. Previous state was " + from + " state.");
- state.monitor.enter();
- try {
- if (from == State.NEW) {
- // startTimer is idempotent, so this is safe to call and it may be necessary if no one has
- // started the timer yet.
- startTimer();
- finishedStarting(false);
- }
- state.serviceTerminated(service);
- } finally {
- state.monitor.leave();
- state.executeListeners();
- }
- }
-
- @Override public void failed(State from, Throwable failure) {
- logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.",
- failure);
- state.monitor.enter();
- try {
- if (from == State.STARTING) {
- finishedStarting(false);
- }
- state.serviceFailed(service);
- } finally {
- state.monitor.leave();
- state.executeListeners();
- }
- }
-
- /**
- * Stop the stopwatch, log the startup time and decrement the startup latch
- *
- * @param currentlyHealthy whether or not the service that finished starting is currently
- * healthy
- */
- @GuardedBy("monitor")
- void finishedStarting(boolean currentlyHealthy) {
- synchronized (watch) {
- watch.stop();
- logger.log(Level.INFO, "Started " + service + " in " + startupTimeMillis() + " ms.");
- }
- state.serviceFinishedStarting(service, currentlyHealthy);
- }
-
- void start() {
- startTimer();
- service.start();
- }
-
- /** Start the timer if it hasn't been started. */
- void startTimer() {
- synchronized (watch) {
- if (!watch.isRunning()) { // only start the watch once.
- watch.start();
- logger.log(Level.INFO, "Starting {0}", service);
- }
- }
- }
-
- /** Returns the amount of time it took for the service to finish starting in milliseconds. */
- synchronized long startupTimeMillis() {
- synchronized (watch) {
- return watch.elapsed(MILLISECONDS);
- }
- }
- }
-
- /** Simple value object binding a listener to its executor. */
- @Immutable private static final class ListenerExecutorPair {
- final Listener listener;
- final Executor executor;
-
- ListenerExecutorPair(Listener listener, Executor executor) {
- this.listener = listener;
- this.executor = executor;
- }
-
- /**
- * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all
- * exceptions
- */
- void execute(Runnable runnable) {
- try {
- executor.execute(runnable);
- } catch (Exception e) {
- logger.log(Level.SEVERE, "Exception while executing listener " + listener
- + " with executor " + executor, e);
- }
- }
- }
-}
diff --git a/guava/src/com/google/common/util/concurrent/Striped.java b/guava/src/com/google/common/util/concurrent/Striped.java
deleted file mode 100644
index 3c426f0..0000000
--- a/guava/src/com/google/common/util/concurrent/Striped.java
+++ /dev/null
@@ -1,376 +0,0 @@
-/*
- * Copyright (C) 2011 The Guava Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.common.util.concurrent;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.MapMaker;
-import com.google.common.math.IntMath;
-import com.google.common.primitives.Ints;
-
-import java.math.RoundingMode;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping
- * similar to that of {@code ConcurrentHashMap} in a reusable form, and extends it for
- * semaphores and read-write locks. Conceptually, lock striping is the technique of dividing a lock
- * into many <i>stripes</i>, increasing the granularity of a single lock and allowing independent
- * operations to lock different stripes and proceed concurrently, instead of creating contention
- * for a single lock.
- *
- * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
- * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)}
- * (assuming {@link Object#hashCode()} is correctly implemented for the keys). Note
- * that if {@code key1} is <strong>not</strong> equal to {@code key2}, it is <strong>not</strong>
- * guaranteed that {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless
- * be mapped to the same lock. The lower the number of stripes, the higher the probability of this
- * happening.
- *
- * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>},
- * and {@code Striped<ReadWriteLock>}. For each type, two implementations are offered:
- * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak}
- * {@code Striped<Lock>}, {@linkplain #semaphore(int, int) strong} and {@linkplain
- * #lazyWeakSemaphore(int, int) weak} {@code Striped<Semaphore>}, and {@linkplain
- * #readWriteLock(int) strong} and {@linkplain #lazyWeakReadWriteLock(int) weak}
- * {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all stripes (locks/semaphores) are
- * initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable.
- * <i>Weak</i> means that locks/semaphores are created lazily, and they are allowed to be reclaimed
- * if nobody is holding on to them. This is useful, for example, if one wants to create a {@code
- * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these
- * would be in use.
- *
- * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
- * represents the task. This maximizes concurrency by having each unique key mapped to a unique
- * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock
- * for all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of
- * choosing either of these extremes, {@code Striped} allows the user to trade between required
- * concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could easily
- * create a very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes,
- * instead of possibly thousands of locks which could be created in a {@code Map<K, Lock>}
- * structure.
- *
- * @author Dimitris Andreou
- * @since 13.0
- */
-@Beta
-public abstract class Striped<L> {
- private Striped() {}
-
- /**
- * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
- * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
- *
- * @param key an arbitrary, non-null key
- * @return the stripe that the passed key corresponds to
- */
- public abstract L get(Object key);
-
- /**
- * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
- * {@code size()}, exclusively.
- *
- * @param index the index of the stripe to return; must be in {@code [0...size())}
- * @return the stripe at the specified index
- */
- public abstract L getAt(int index);
-
- /**
- * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
- */
- abstract int indexFor(Object key);
-
- /**
- * Returns the total number of stripes in this instance.
- */
- public abstract int size();
-
- /**
- * Returns the stripes that correspond to the passed objects, in ascending (as per
- * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
- * by this method are guaranteed to not deadlock each other.
- *
- * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
- * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number
- * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
- * are needed for a pair of them to match). Please consider carefully the implications of the
- * number of stripes, the intended concurrency level, and the typical number of keys used in a
- * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
- * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
- * collisions.
- *
- * @param keys arbitrary non-null keys
- * @return the stripes corresponding to the objects (one per each object, derived by delegating
- * to {@link #get(Object)}; may contain duplicates), in an increasing index order.
- */
- public Iterable<L> bulkGet(Iterable<?> keys) {
- // Initially using the array to store the keys, then reusing it to store the respective L's
- final Object[] array = Iterables.toArray(keys, Object.class);
- int[] stripes = new int[array.length];
- for (int i = 0; i < array.length; i++) {
- stripes[i] = indexFor(array[i]);
- }
- Arrays.sort(stripes);
- for (int i = 0; i < array.length; i++) {
- array[i] = getAt(stripes[i]);
- }
- /*
- * Note that the returned Iterable holds references to the returned stripes, to avoid
- * error-prone code like:
- *
- * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
- * Iterable<Lock> locks = stripedLock.bulkGet(keys);
- * for (Lock lock : locks) {
- * lock.lock();
- * }
- * operation();
- * for (Lock lock : locks) {
- * lock.unlock();
- * }
- *
- * If we only held the int[] stripes, translating it on the fly to L's, the original locks
- * might be garbage collected after locking them, ending up in a huge mess.
- */
- @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
- List<L> asList = (List<L>) Arrays.asList(array);
- return Collections.unmodifiableList(asList);
- }
-
- // Static factories
-
- /**
- * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks, with the
- * specified fairness. Every lock is reentrant.
- *
- * @param stripes the minimum number of stripes (locks) required
- * @return a new {@code Striped<Lock>}
- */
- public static Striped<Lock> lock(int stripes) {
- return new CompactStriped<Lock>(stripes, new Supplier<Lock>() {
- public Lock get() {
- return new PaddedLock();
- }
- });
- }
-
- /**
- * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks, with the
- * specified fairness. Every lock is reentrant.
- *
- * @param stripes the minimum number of stripes (locks) required
- * @return a new {@code Striped<Lock>}
- */
- public static Striped<Lock> lazyWeakLock(int stripes) {
- return new LazyStriped<Lock>(stripes, new Supplier<Lock>() {
- public Lock get() {
- return new ReentrantLock(false);
- }
- });
- }
-
- /**
- * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
- * with the specified number of permits and fairness.
- *
- * @param stripes the minimum number of stripes (semaphores) required
- * @param permits the number of permits in each semaphore
- * @return a new {@code Striped<Semaphore>}
- */
- public static Striped<Semaphore> semaphore(int stripes, final int permits) {
- return new CompactStriped<Semaphore>(stripes, new Supplier<Semaphore>() {
- public Semaphore get() {
- return new PaddedSemaphore(permits);
- }
- });
- }
-
- /**
- * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
- * with the specified number of permits and fairness.
- *
- * @param stripes the minimum number of stripes (semaphores) required
- * @param permits the number of permits in each semaphore
- * @return a new {@code Striped<Semaphore>}
- */
- public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
- return new LazyStriped<Semaphore>(stripes, new Supplier<Semaphore>() {
- public Semaphore get() {
- return new Semaphore(permits, false);
- }
- });
- }
-
- /**
- * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
- * read-write locks, with the specified fairness. Every lock is reentrant.
- *
- * @param stripes the minimum number of stripes (locks) required
- * @return a new {@code Striped<ReadWriteLock>}
- */
- public static Striped<ReadWriteLock> readWriteLock(int stripes) {
- return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER);
- }
-
- /**
- * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
- * read-write locks, with the specified fairness. Every lock is reentrant.
- *
- * @param stripes the minimum number of stripes (locks) required
- * @return a new {@code Striped<ReadWriteLock>}
- */
- public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
- return new LazyStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER);
- }
-
- // ReentrantReadWriteLock is large enough to make padding probably unnecessary
- private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
- new Supplier<ReadWriteLock>() {
- public ReadWriteLock get() {
- return new ReentrantReadWriteLock();
- }
- };
-
- private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
- /** Capacity (power of two) minus one, for fast mod evaluation */
- final int mask;
-
- PowerOfTwoStriped(int stripes) {
- Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
- this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
- }
-
- @Override final int indexFor(Object key) {
- int hash = smear(key.hashCode());
- return hash & mask;
- }
-
- @Override public final L get(Object key) {
- return getAt(indexFor(key));
- }
- }
-
- /**
- * Implementation of Striped where 2^k stripes are represented as an array of the same length,
- * eagerly initialized.
- */
- private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
- /** Size is a power of two. */
- private final Object[] array;
-
- private CompactStriped(int stripes, Supplier<L> supplier) {
- super(stripes);
- Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
-
- this.array = new Object[mask + 1];
- for (int i = 0; i < array.length; i++) {
- array[i] = supplier.get();
- }
- }
-
- @SuppressWarnings("unchecked") // we only put L's in the array
- @Override public L getAt(int index) {
- return (L) array[index];
- }
-
- @Override public int size() {
- return array.length;
- }
- }
-
- /**
- * Implementation of Striped where up to 2^k stripes can be represented, using a Cache
- * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
- * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
- */
- private static class LazyStriped<L> extends PowerOfTwoStriped<L> {
- final ConcurrentMap<Integer, L> cache;
- final int size;
-
- LazyStriped(int stripes, Supplier<L> supplier) {
- super(stripes);
- this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
- this.cache = new MapMaker().weakValues().makeComputingMap(Functions.forSupplier(supplier));
- }
-
- @Override public L getAt(int index) {
- Preconditions.checkElementIndex(index, size());
- return cache.get(index);
- }
-
- @Override public int size() {
- return size;
- }
- }
-
- /**
- * A bit mask were all bits are set.
- */
- private static final int ALL_SET = ~0;
-
- private static int ceilToPowerOfTwo(int x) {
- return 1 << IntMath.log2(x, RoundingMode.CEILING);
- }
-
- /*
- * This method was written by Doug Lea with assistance from members of JCP
- * JSR-166 Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/licenses/publicdomain
- *
- * As of 2010/06/11, this method is identical to the (package private) hash
- * method in OpenJDK 7's java.util.HashMap class.
- */
- // Copied from java/com/google/common/collect/Hashing.java
- private static int smear(int hashCode) {
- hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
- return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
- }
-
- private static class PaddedLock extends ReentrantLock {
- /*
- * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add
- * a fourth long here, to minimize chance of interference between consecutive locks,
- * but I couldn't observe any benefit from that.
- */
- @SuppressWarnings("unused")
- long q1, q2, q3;
-
- PaddedLock() {
- super(false);
- }
- }
-
- private static class PaddedSemaphore extends Semaphore {
- // See PaddedReentrantLock comment
- @SuppressWarnings("unused")
- long q1, q2, q3;
-
- PaddedSemaphore(int permits) {
- super(permits, false);
- }
- }
-}
diff --git a/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java b/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java
index a05247e..167ad11 100644
--- a/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java
+++ b/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java
@@ -61,12 +61,9 @@ public final class ThreadFactoryBuilder {
* @param nameFormat a {@link String#format(String, Object...)}-compatible
* format String, to which a unique integer (0, 1, etc.) will be supplied
* as the single parameter. This integer will be unique to the built
- * instance of the ThreadFactory and will be assigned sequentially. For
- * example, {@code "rpc-pool-%d"} will generate thread names like
- * {@code "rpc-pool-0"}, {@code "rpc-pool-1"}, {@code "rpc-pool-2"}, etc.
+ * instance of the ThreadFactory and will be assigned sequentially.
* @return this for the builder pattern
*/
- @SuppressWarnings("ReturnValueIgnored")
public ThreadFactoryBuilder setNameFormat(String nameFormat) {
String.format(nameFormat, 0); // fail fast if the format is bad or null
this.nameFormat = nameFormat;
diff --git a/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java b/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java
index 77afdc3..c0c99e1 100644
--- a/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java
+++ b/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java
@@ -16,10 +16,9 @@
package com.google.common.util.concurrent;
+import com.google.common.annotations.Beta;
import com.google.common.annotations.GwtCompatible;
-import javax.annotation.Nullable;
-
/**
* Unchecked variant of {@link java.util.concurrent.ExecutionException}. As with
* {@code ExecutionException}, the exception's {@linkplain #getCause() cause}
@@ -27,16 +26,17 @@ import javax.annotation.Nullable;
*
* <p>{@code UncheckedExecutionException} is intended as an alternative to
* {@code ExecutionException} when the exception thrown by a task is an
- * unchecked exception. However, it may also wrap a checked exception in some
- * cases.
+ * unchecked exception. This allows the client code to continue to distinguish
+ * between checked and unchecked exceptions, even when they come from other
+ * threads.
*
* <p>When wrapping an {@code Error} from another thread, prefer {@link
- * ExecutionError}. When wrapping a checked exception, prefer {@code
- * ExecutionException}.
+ * ExecutionError}.
*
* @author Charles Fry
* @since 10.0
*/
+@Beta
@GwtCompatible
public class UncheckedExecutionException extends RuntimeException {
/**
@@ -47,21 +47,21 @@ public class UncheckedExecutionException extends RuntimeException {
/**
* Creates a new instance with the given detail message.
*/
- protected UncheckedExecutionException(@Nullable String message) {
+ protected UncheckedExecutionException(String message) {
super(message);
}
/**
* Creates a new instance with the given detail message and cause.
*/
- public UncheckedExecutionException(@Nullable String message, @Nullable Throwable cause) {
+ public UncheckedExecutionException(String message, Throwable cause) {
super(message, cause);
}
/**
* Creates a new instance with the given cause.
*/
- public UncheckedExecutionException(@Nullable Throwable cause) {
+ public UncheckedExecutionException(Throwable cause) {
super(cause);
}
diff --git a/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java b/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java
index cefe379..d821c84 100644
--- a/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java
+++ b/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java
@@ -16,8 +16,6 @@
package com.google.common.util.concurrent;
-import javax.annotation.Nullable;
-
/**
* Unchecked version of {@link java.util.concurrent.TimeoutException}.
*
@@ -27,15 +25,15 @@ import javax.annotation.Nullable;
public class UncheckedTimeoutException extends RuntimeException {
public UncheckedTimeoutException() {}
- public UncheckedTimeoutException(@Nullable String message) {
+ public UncheckedTimeoutException(String message) {
super(message);
}
- public UncheckedTimeoutException(@Nullable Throwable cause) {
+ public UncheckedTimeoutException(Throwable cause) {
super(cause);
}
- public UncheckedTimeoutException(@Nullable String message, @Nullable Throwable cause) {
+ public UncheckedTimeoutException(String message, Throwable cause) {
super(message, cause);
}
diff --git a/guava/src/com/google/common/util/concurrent/Uninterruptibles.java b/guava/src/com/google/common/util/concurrent/Uninterruptibles.java
index 79d7598..89f30b8 100644
--- a/guava/src/com/google/common/util/concurrent/Uninterruptibles.java
+++ b/guava/src/com/google/common/util/concurrent/Uninterruptibles.java
@@ -122,9 +122,6 @@ public final class Uninterruptibles {
* <p>If instead, you wish to treat {@link InterruptedException} uniformly
* with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
* or {@link Futures#makeChecked}.
- *
- * @throws ExecutionException if the computation threw an exception
- * @throws CancellationException if the computation was cancelled
*/
public static <V> V getUninterruptibly(Future<V> future)
throws ExecutionException {
@@ -152,10 +149,6 @@ public final class Uninterruptibles {
* <p>If instead, you wish to treat {@link InterruptedException} uniformly
* with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
* or {@link Futures#makeChecked}.
- *
- * @throws ExecutionException if the computation threw an exception
- * @throws CancellationException if the computation was cancelled
- * @throws TimeoutException if the wait timed out
*/
public static <V> V getUninterruptibly(
Future<V> future, long timeout, TimeUnit unit)
@@ -233,11 +226,6 @@ public final class Uninterruptibles {
/**
* Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)}
* uninterruptibly.
- *
- * @throws ClassCastException if the class of the specified element prevents
- * it from being added to the given queue
- * @throws IllegalArgumentException if some property of the specified element
- * prevents it from being added to the given queue
*/
public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
boolean interrupted = false;
diff --git a/guava/src/com/google/common/util/concurrent/package-info.java b/guava/src/com/google/common/util/concurrent/package-info.java
index 6ea5069..6281552 100644
--- a/guava/src/com/google/common/util/concurrent/package-info.java
+++ b/guava/src/com/google/common/util/concurrent/package-info.java
@@ -33,4 +33,3 @@
package com.google.common.util.concurrent;
import javax.annotation.ParametersAreNonnullByDefault;
-