diff options
Diffstat (limited to 'guava/src/com/google/common/util')
32 files changed, 789 insertions, 4569 deletions
diff --git a/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java b/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java index a1cb641..8136d23 100644 --- a/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java +++ b/guava/src/com/google/common/util/concurrent/AbstractExecutionThreadService.java @@ -55,8 +55,7 @@ public abstract class AbstractExecutionThreadService implements Service { shutDown(); } catch (Exception ignored) { logger.log(Level.WARNING, - "Error while attempting to shut down the service" - + " after failure.", ignored); + "Error while attempting to shut down the service after failure.", ignored); } throw t; } @@ -78,14 +77,7 @@ public abstract class AbstractExecutionThreadService implements Service { }; /** - * Constructor for use by subclasses. - */ - protected AbstractExecutionThreadService() {} - - /** * Start the service. This method is invoked on the execution thread. - * - * <p>By default this method does nothing. */ protected void startUp() throws Exception {} @@ -107,16 +99,12 @@ public abstract class AbstractExecutionThreadService implements Service { /** * Stop the service. This method is invoked on the execution thread. - * - * <p>By default this method does nothing. */ // TODO: consider supporting a TearDownTestCase-like API protected void shutDown() throws Exception {} /** * Invoked to request the service to stop. - * - * <p>By default this method does nothing. */ protected void triggerShutdown() {} @@ -129,19 +117,19 @@ public abstract class AbstractExecutionThreadService implements Service { * promptly. * * <p>The default implementation returns a new {@link Executor} that sets the - * name of its threads to the string returned by {@link #serviceName} + * name of its threads to the string returned by {@link #getServiceName} */ protected Executor executor() { return new Executor() { @Override public void execute(Runnable command) { - MoreExecutors.newThread(serviceName(), command).start(); + new Thread(command, getServiceName()).start(); } }; } @Override public String toString() { - return serviceName() + " [" + state() + "]"; + return getServiceName() + " [" + state() + "]"; } // We override instead of using ForwardingService so that these can be final. @@ -171,28 +159,14 @@ public abstract class AbstractExecutionThreadService implements Service { } /** - * @since 13.0 - */ - @Override public final void addListener(Listener listener, Executor executor) { - delegate.addListener(listener, executor); - } - - /** - * @since 14.0 - */ - @Override public final Throwable failureCause() { - return delegate.failureCause(); - } - - /** - * Returns the name of this service. {@link AbstractExecutionThreadService} - * may include the name in debugging output. + * Returns the name of this service. {@link AbstractExecutionThreadService} may include the name + * in debugging output. * * <p>Subclasses may override this method. * - * @since 14.0 (present in 10.0 as getServiceName) + * @since 10.0 */ - protected String serviceName() { + protected String getServiceName() { return getClass().getSimpleName(); } } diff --git a/guava/src/com/google/common/util/concurrent/AbstractFuture.java b/guava/src/com/google/common/util/concurrent/AbstractFuture.java index e14a111..bef3d3d 100644 --- a/guava/src/com/google/common/util/concurrent/AbstractFuture.java +++ b/guava/src/com/google/common/util/concurrent/AbstractFuture.java @@ -70,11 +70,6 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { // The execution list to hold our executors. private final ExecutionList executionList = new ExecutionList(); - /** - * Constructor for use by subclasses. - */ - protected AbstractFuture() {} - /* * Improve the documentation of when InterruptedException is thrown. Our * behavior matches the JDK's, but the JDK's documentation is misleading. @@ -128,7 +123,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { @Override public boolean cancel(boolean mayInterruptIfRunning) { - if (!sync.cancel(mayInterruptIfRunning)) { + if (!sync.cancel()) { return false; } executionList.execute(); @@ -151,16 +146,6 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { } /** - * Returns true if this future was cancelled with {@code - * mayInterruptIfRunning} set to {@code true}. - * - * @since 14.0 - */ - protected final boolean wasInterrupted() { - return sync.wasInterrupted(); - } - - /** * {@inheritDoc} * * @since 10.0 @@ -216,14 +201,13 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { * private subclass to hold the synchronizer. This synchronizer is used to * implement the blocking and waiting calls as well as to handle state changes * in a thread-safe manner. The current state of the future is held in the - * Sync state, and the lock is released whenever the state changes to - * {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED} + * Sync state, and the lock is released whenever the state changes to either + * {@link #COMPLETED} or {@link #CANCELLED}. * * <p>To avoid races between threads doing release and acquire, we transition * to the final state in two steps. One thread will successfully CAS from * RUNNING to COMPLETING, that thread will then set the result of the - * computation, and only then transition to COMPLETED, CANCELLED, or - * INTERRUPTED. + * computation, and only then transition to COMPLETED or CANCELLED. * * <p>We don't use the integer argument passed between acquire methods so we * pass around a -1 everywhere. @@ -237,7 +221,6 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { static final int COMPLETING = 1; static final int COMPLETED = 2; static final int CANCELLED = 4; - static final int INTERRUPTED = 8; private V value; private Throwable exception; @@ -309,9 +292,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { } case CANCELLED: - case INTERRUPTED: - throw cancellationExceptionWithCause( - "Task was cancelled.", exception); + throw new CancellationException("Task was cancelled."); default: throw new IllegalStateException( @@ -320,25 +301,17 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { } /** - * Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link - * INTERRUPTED}. + * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. */ boolean isDone() { - return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0; + return (getState() & (COMPLETED | CANCELLED)) != 0; } /** - * Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}. + * Checks if the state is {@link #CANCELLED}. */ boolean isCancelled() { - return (getState() & (CANCELLED | INTERRUPTED)) != 0; - } - - /** - * Checks if the state is {@link #INTERRUPTED}. - */ - boolean wasInterrupted() { - return getState() == INTERRUPTED; + return getState() == CANCELLED; } /** @@ -356,10 +329,10 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { } /** - * Transition to the CANCELLED or INTERRUPTED state. + * Transition to the CANCELLED state. */ - boolean cancel(boolean interrupt) { - return complete(null, null, interrupt ? INTERRUPTED : CANCELLED); + boolean cancel() { + return complete(null, null, CANCELLED); } /** @@ -367,8 +340,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { * be set but not both. The {@code finalState} is the state to change to * from {@link #RUNNING}. If the state is not in the RUNNING state we * return {@code false} after waiting for the state to be set to a valid - * final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link - * #INTERRUPTED}). + * final state ({@link #COMPLETED} or {@link #CANCELLED}). * * @param v the value to set as the result of the computation. * @param t the exception to set as the result of the computation. @@ -381,9 +353,7 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { // If this thread successfully transitioned to COMPLETING, set the value // and exception and then release to the final state. this.value = v; - // Don't actually construct a CancellationException until necessary. - this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0) - ? new CancellationException("Future.cancel() was called.") : t; + this.exception = t; releaseShared(finalState); } else if (getState() == COMPLETING) { // If some other thread is currently completing the future, block until @@ -393,11 +363,4 @@ public abstract class AbstractFuture<V> implements ListenableFuture<V> { return doCompletion; } } - - static final CancellationException cancellationExceptionWithCause( - @Nullable String message, @Nullable Throwable cause) { - CancellationException exception = new CancellationException(message); - exception.initCause(cause); - return exception; - } } diff --git a/guava/src/com/google/common/util/concurrent/AbstractIdleService.java b/guava/src/com/google/common/util/concurrent/AbstractIdleService.java index 96a6ff3..504a6bc 100644 --- a/guava/src/com/google/common/util/concurrent/AbstractIdleService.java +++ b/guava/src/com/google/common/util/concurrent/AbstractIdleService.java @@ -37,7 +37,7 @@ public abstract class AbstractIdleService implements Service { /* use AbstractService for state management */ private final Service delegate = new AbstractService() { @Override protected final void doStart() { - executor().execute(new Runnable() { + executor(State.STARTING).execute(new Runnable() { @Override public void run() { try { startUp(); @@ -51,7 +51,7 @@ public abstract class AbstractIdleService implements Service { } @Override protected final void doStop() { - executor().execute(new Runnable() { + executor(State.STOPPING).execute(new Runnable() { @Override public void run() { try { shutDown(); @@ -65,9 +65,6 @@ public abstract class AbstractIdleService implements Service { } }; - /** Constructor for use by subclasses. */ - protected AbstractIdleService() {} - /** Start the service. */ protected abstract void startUp() throws Exception; @@ -81,19 +78,22 @@ public abstract class AbstractIdleService implements Service { * priority. The returned executor's {@link Executor#execute(Runnable) * execute()} method is called when this service is started and stopped, * and should return promptly. + * + * @param state {@link Service.State#STARTING} or + * {@link Service.State#STOPPING}, used by the default implementation for + * naming the thread */ - protected Executor executor() { - final State state = state(); + protected Executor executor(final State state) { return new Executor() { @Override public void execute(Runnable command) { - MoreExecutors.newThread(serviceName() + " " + state, command).start(); + new Thread(command, getServiceName() + " " + state).start(); } }; } @Override public String toString() { - return serviceName() + " [" + state() + "]"; + return getServiceName() + " [" + state() + "]"; } // We override instead of using ForwardingService so that these can be final. @@ -122,27 +122,7 @@ public abstract class AbstractIdleService implements Service { return delegate.stopAndWait(); } - /** - * @since 13.0 - */ - @Override public final void addListener(Listener listener, Executor executor) { - delegate.addListener(listener, executor); - } - - /** - * @since 14.0 - */ - @Override public final Throwable failureCause() { - return delegate.failureCause(); - } - - /** - * Returns the name of this service. {@link AbstractIdleService} may include the name in debugging - * output. - * - * @since 14.0 - */ - protected String serviceName() { + private String getServiceName() { return getClass().getSimpleName(); } } diff --git a/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java b/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java index c3e33a5..24f596f 100644 --- a/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java +++ b/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java @@ -14,9 +14,7 @@ package com.google.common.util.concurrent; -import static com.google.common.util.concurrent.MoreExecutors.invokeAnyImpl; - -import com.google.common.annotations.Beta; +import static com.google.common.base.Preconditions.checkArgument; import java.util.ArrayList; import java.util.Collection; @@ -25,12 +23,11 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.annotation.Nullable; - /** * Implements {@link ListeningExecutorService} execution methods atop the abstract {@link #execute} * method. More concretely, the {@code submit}, {@code invokeAny} and {@code invokeAll} methods @@ -40,17 +37,15 @@ import javax.annotation.Nullable; * termination. * * @author Doug Lea - * @since 14.0 */ -@Beta -public abstract class AbstractListeningExecutorService implements ListeningExecutorService { +abstract class AbstractListeningExecutorService implements ListeningExecutorService { @Override public ListenableFuture<?> submit(Runnable task) { ListenableFutureTask<Void> ftask = ListenableFutureTask.create(task, null); execute(ftask); return ftask; } - @Override public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) { + @Override public <T> ListenableFuture<T> submit(Runnable task, T result) { ListenableFutureTask<T> ftask = ListenableFutureTask.create(task, result); execute(ftask); return ftask; @@ -62,10 +57,82 @@ public abstract class AbstractListeningExecutorService implements ListeningExecu return ftask; } + /** + * the main mechanics of invokeAny. + */ + private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) + throws InterruptedException, ExecutionException, TimeoutException { + int ntasks = tasks.size(); + checkArgument(ntasks > 0); + List<Future<T>> futures = new ArrayList<Future<T>>(ntasks); + ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); + + // For efficiency, especially in executors with limited + // parallelism, check to see if previously submitted tasks are + // done before submitting more of them. This interleaving + // plus the exception mechanics account for messiness of main + // loop. + + try { + // Record exceptions so that if we fail to obtain any + // result, we can throw the last exception we got. + ExecutionException ee = null; + long lastTime = timed ? System.nanoTime() : 0; + Iterator<? extends Callable<T>> it = tasks.iterator(); + + // Start one task for sure; the rest incrementally + futures.add(ecs.submit(it.next())); + --ntasks; + int active = 1; + + for (;;) { + Future<T> f = ecs.poll(); + if (f == null) { + if (ntasks > 0) { + --ntasks; + futures.add(ecs.submit(it.next())); + ++active; + } else if (active == 0) { + break; + } else if (timed) { + f = ecs.poll(nanos, TimeUnit.NANOSECONDS); + if (f == null) { + throw new TimeoutException(); + } + long now = System.nanoTime(); + nanos -= now - lastTime; + lastTime = now; + } else { + f = ecs.take(); + } + } + if (f != null) { + --active; + try { + return f.get(); + } catch (ExecutionException eex) { + ee = eex; + } catch (RuntimeException rex) { + ee = new ExecutionException(rex); + } + } + } + + if (ee == null) { + ee = new ExecutionException(null); + } + throw ee; + } finally { + for (Future<T> f : futures) { + f.cancel(true); + } + } + } + @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { try { - return invokeAnyImpl(this, tasks, false, 0); + return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { throw new AssertionError(); } @@ -74,7 +141,7 @@ public abstract class AbstractListeningExecutorService implements ListeningExecu @Override public <T> T invokeAny( Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout)); + return doInvokeAny(tasks, true, unit.toNanos(timeout)); } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) diff --git a/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java b/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java index 949f76a..f847205 100644 --- a/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java +++ b/guava/src/com/google/common/util/concurrent/AbstractScheduledService.java @@ -21,11 +21,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; @@ -98,8 +96,9 @@ public abstract class AbstractScheduledService implements Service { * * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory * methods, these provide {@link Scheduler} instances for the common use case of running the - * service with a fixed schedule. If more flexibility is needed then consider subclassing - * {@link CustomScheduler}. + * service with a fixed schedule. If more flexibility is needed then consider subclassing the + * {@link CustomScheduler} abstract class in preference to creating your own {@link Scheduler} + * implementation. * * @author Luke Sandberg * @since 11.0 @@ -230,9 +229,6 @@ public abstract class AbstractScheduledService implements Service { } }; - /** Constructor for use by subclasses. */ - protected AbstractScheduledService() {} - /** * Run one iteration of the scheduled task. If any invocation of this method throws an exception, * the service will transition to the {@link Service.State#FAILED} state and this method will no @@ -240,19 +236,11 @@ public abstract class AbstractScheduledService implements Service { */ protected abstract void runOneIteration() throws Exception; - /** - * Start the service. - * - * <p>By default this method does nothing. - */ - protected void startUp() throws Exception {} + /** Start the service. */ + protected abstract void startUp() throws Exception; - /** - * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. - * - * <p>By default this method does nothing. - */ - protected void shutDown() throws Exception {} + /** Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. */ + protected abstract void shutDown() throws Exception; /** * Returns the {@link Scheduler} object used to configure this service. This method will only be @@ -262,56 +250,19 @@ public abstract class AbstractScheduledService implements Service { /** * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, - * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the - * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this - * service {@linkplain Service.State#TERMINATED terminates} or - * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a - * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called - * once. + * {@link #runOneIteration} and {@link #shutDown} methods. The executor will not be + * {@link ScheduledExecutorService#shutdown} when this service stops. Subclasses may override this + * method to use a custom {@link ScheduledExecutorService} instance. * * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread - * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. - * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the - * service {@linkplain Service.State#TERMINATED terminates} or - * {@linkplain Service.State#TERMINATED fails}. + * pool. This method will only be called once. */ protected ScheduledExecutorService executor() { - final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactory() { - @Override public Thread newThread(Runnable runnable) { - return MoreExecutors.newThread(serviceName(), runnable); - } - }); - // Add a listener to shutdown the executor after the service is stopped. This ensures that the - // JVM shutdown will not be prevented from exiting after this service has stopped or failed. - // Technically this listener is added after start() was called so it is a little gross, but it - // is called within doStart() so we know that the service cannot terminate or fail concurrently - // with adding this listener so it is impossible to miss an event that we are interested in. - addListener(new Listener() { - @Override public void starting() {} - @Override public void running() {} - @Override public void stopping(State from) {} - @Override public void terminated(State from) { - executor.shutdown(); - } - @Override public void failed(State from, Throwable failure) { - executor.shutdown(); - }}, MoreExecutors.sameThreadExecutor()); - return executor; + return Executors.newSingleThreadScheduledExecutor(); } - /** - * Returns the name of this service. {@link AbstractScheduledService} may include the name in - * debugging output. - * - * @since 14.0 - */ - protected String serviceName() { - return getClass().getSimpleName(); - } - @Override public String toString() { - return serviceName() + " [" + state() + "]"; + return getClass().getSimpleName() + " [" + state() + "]"; } // We override instead of using ForwardingService so that these can be final. @@ -341,20 +292,6 @@ public abstract class AbstractScheduledService implements Service { } /** - * @since 13.0 - */ - @Override public final void addListener(Listener listener, Executor executor) { - delegate.addListener(listener, executor); - } - - /** - * @since 14.0 - */ - @Override public final Throwable failureCause() { - return delegate.failureCause(); - } - - /** * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to * use a dynamically changing schedule. After every execution of the task, assuming it hasn't * been cancelled, the {@link #getNextSchedule} method will be called. diff --git a/guava/src/com/google/common/util/concurrent/AbstractService.java b/guava/src/com/google/common/util/concurrent/AbstractService.java index f028a59..f84b374 100644 --- a/guava/src/com/google/common/util/concurrent/AbstractService.java +++ b/guava/src/com/google/common/util/concurrent/AbstractService.java @@ -16,148 +16,68 @@ package com.google.common.util.concurrent; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.Beta; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import com.google.common.util.concurrent.Service.State; // javadoc needs this -import java.util.List; -import java.util.Queue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.Immutable; /** - * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} - * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} - * callbacks. Its subclasses must manage threads manually; consider - * {@link AbstractExecutionThreadService} if you need only a single execution thread. + * Base class for implementing services that can handle {@link #doStart} and + * {@link #doStop} requests, responding to them with {@link #notifyStarted()} + * and {@link #notifyStopped()} callbacks. Its subclasses must manage threads + * manually; consider {@link AbstractExecutionThreadService} if you need only a + * single execution thread. * * @author Jesse Wilson - * @author Luke Sandberg * @since 1.0 */ @Beta public abstract class AbstractService implements Service { - private static final Logger logger = Logger.getLogger(AbstractService.class.getName()); + private final ReentrantLock lock = new ReentrantLock(); private final Transition startup = new Transition(); private final Transition shutdown = new Transition(); /** - * The listeners to notify during a state transition. - */ - @GuardedBy("lock") - private final List<ListenerExecutorPair> listeners = Lists.newArrayList(); - - /** - * The queue of listeners that are waiting to be executed. - * - * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be - * protected by the implicit lock on this object. Dequeue operations should be executed atomically - * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be - * held when the listeners are being executed. Use {@link #executeListeners} for this operation. - * This is necessary to ensure that elements on the queue are executed in the correct order. - * Enqueue operations should be protected so that listeners are added in the correct order. We use - * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues. + * The internal state, which equals external state unless + * shutdownWhenStartupFinishes is true. Guarded by {@code lock}. */ - @GuardedBy("queuedListeners") - private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue(); - + private State state = State.NEW; + /** - * The current state of the service. This should be written with the lock held but can be read - * without it because it is an immutable object in a volatile field. This is desirable so that - * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run - * without grabbing the lock. - * - * <p>To update this field correctly the lock must be held to guarantee that the state is - * consistent. + * If true, the user requested a shutdown while the service was still starting + * up. Guarded by {@code lock}. */ - @GuardedBy("lock") - private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW); + private boolean shutdownWhenStartupFinishes = false; - /** Constructor for use by subclasses. */ - protected AbstractService() { - // Add a listener to update the futures. This needs to be added first so that it is executed - // before the other listeners. This way the other listeners can access the completed futures. - addListener( - new Listener() { - @Override public void starting() {} - - @Override public void running() { - startup.set(State.RUNNING); - } - - @Override public void stopping(State from) { - if (from == State.STARTING) { - startup.set(State.STOPPING); - } - } - - @Override public void terminated(State from) { - if (from == State.NEW) { - startup.set(State.TERMINATED); - } - shutdown.set(State.TERMINATED); - } - - @Override public void failed(State from, Throwable failure) { - switch (from) { - case STARTING: - startup.setException(failure); - shutdown.setException(new Exception("Service failed to start.", failure)); - break; - case RUNNING: - shutdown.setException(new Exception("Service failed while running", failure)); - break; - case STOPPING: - shutdown.setException(failure); - break; - case TERMINATED: /* fall-through */ - case FAILED: /* fall-through */ - case NEW: /* fall-through */ - default: - throw new AssertionError("Unexpected from state: " + from); - } - } - }, - MoreExecutors.sameThreadExecutor()); - } - /** - * This method is called by {@link #start} to initiate service startup. The invocation of this - * method should cause a call to {@link #notifyStarted()}, either during this method's run, or - * after it has returned. If startup fails, the invocation should cause a call to - * {@link #notifyFailed(Throwable)} instead. + * This method is called by {@link #start} to initiate service startup. The + * invocation of this method should cause a call to {@link #notifyStarted()}, + * either during this method's run, or after it has returned. If startup + * fails, the invocation should cause a call to {@link + * #notifyFailed(Throwable)} instead. * - * <p>This method should return promptly; prefer to do work on a different thread where it is - * convenient. It is invoked exactly once on service startup, even when {@link #start} is called - * multiple times. + * <p>This method should return promptly; prefer to do work on a different + * thread where it is convenient. It is invoked exactly once on service + * startup, even when {@link #start} is called multiple times. */ protected abstract void doStart(); /** - * This method should be used to initiate service shutdown. The invocation of this method should - * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has - * returned. If shutdown fails, the invocation should cause a call to - * {@link #notifyFailed(Throwable)} instead. + * This method should be used to initiate service shutdown. The invocation + * of this method should cause a call to {@link #notifyStopped()}, either + * during this method's run, or after it has returned. If shutdown fails, the + * invocation should cause a call to {@link #notifyFailed(Throwable)} instead. * - * <p> This method should return promptly; prefer to do work on a different thread where it is - * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called - * multiple times. + * <p>This method should return promptly; prefer to do work on a different + * thread where it is convenient. It is invoked exactly once on service + * shutdown, even when {@link #stop} is called multiple times. */ protected abstract void doStop(); @@ -165,16 +85,15 @@ public abstract class AbstractService implements Service { public final ListenableFuture<State> start() { lock.lock(); try { - if (snapshot.state == State.NEW) { - snapshot = new StateSnapshot(State.STARTING); - starting(); + if (state == State.NEW) { + state = State.STARTING; doStart(); } } catch (Throwable startupFailure) { + // put the exception in the future, the user can get it via Future.get() notifyFailed(startupFailure); } finally { lock.unlock(); - executeListeners(); } return startup; @@ -184,33 +103,22 @@ public abstract class AbstractService implements Service { public final ListenableFuture<State> stop() { lock.lock(); try { - switch (snapshot.state) { - case NEW: - snapshot = new StateSnapshot(State.TERMINATED); - terminated(State.NEW); - break; - case STARTING: - snapshot = new StateSnapshot(State.STARTING, true, null); - stopping(State.STARTING); - break; - case RUNNING: - snapshot = new StateSnapshot(State.STOPPING); - stopping(State.RUNNING); - doStop(); - break; - case STOPPING: - case TERMINATED: - case FAILED: - // do nothing - break; - default: - throw new AssertionError("Unexpected state: " + snapshot.state); + if (state == State.NEW) { + state = State.TERMINATED; + startup.set(State.TERMINATED); + shutdown.set(State.TERMINATED); + } else if (state == State.STARTING) { + shutdownWhenStartupFinishes = true; + startup.set(State.STOPPING); + } else if (state == State.RUNNING) { + state = State.STOPPING; + doStop(); } } catch (Throwable shutdownFailure) { + // put the exception in the future, the user can get it via Future.get() notifyFailed(shutdownFailure); } finally { lock.unlock(); - executeListeners(); } return shutdown; @@ -227,91 +135,84 @@ public abstract class AbstractService implements Service { } /** - * Implementing classes should invoke this method once their service has started. It will cause - * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. + * Implementing classes should invoke this method once their service has + * started. It will cause the service to transition from {@link + * State#STARTING} to {@link State#RUNNING}. * - * @throws IllegalStateException if the service is not {@link State#STARTING}. + * @throws IllegalStateException if the service is not + * {@link State#STARTING}. */ protected final void notifyStarted() { lock.lock(); try { - if (snapshot.state != State.STARTING) { + if (state != State.STARTING) { IllegalStateException failure = new IllegalStateException( - "Cannot notifyStarted() when the service is " + snapshot.state); + "Cannot notifyStarted() when the service is " + state); notifyFailed(failure); throw failure; } - if (snapshot.shutdownWhenStartupFinishes) { - snapshot = new StateSnapshot(State.STOPPING); - // We don't call listeners here because we already did that when we set the - // shutdownWhenStartupFinishes flag. - doStop(); + state = State.RUNNING; + if (shutdownWhenStartupFinishes) { + stop(); } else { - snapshot = new StateSnapshot(State.RUNNING); - running(); + startup.set(State.RUNNING); } } finally { lock.unlock(); - executeListeners(); } } /** - * Implementing classes should invoke this method once their service has stopped. It will cause - * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. + * Implementing classes should invoke this method once their service has + * stopped. It will cause the service to transition from {@link + * State#STOPPING} to {@link State#TERMINATED}. * - * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor - * {@link State#RUNNING}. + * @throws IllegalStateException if the service is neither {@link + * State#STOPPING} nor {@link State#RUNNING}. */ protected final void notifyStopped() { lock.lock(); try { - if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) { + if (state != State.STOPPING && state != State.RUNNING) { IllegalStateException failure = new IllegalStateException( - "Cannot notifyStopped() when the service is " + snapshot.state); + "Cannot notifyStopped() when the service is " + state); notifyFailed(failure); throw failure; } - State previous = snapshot.state; - snapshot = new StateSnapshot(State.TERMINATED); - terminated(previous); + + state = State.TERMINATED; + shutdown.set(State.TERMINATED); } finally { lock.unlock(); - executeListeners(); } } /** - * Invoke this method to transition the service to the {@link State#FAILED}. The service will - * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically - * or otherwise cannot be started nor stopped. + * Invoke this method to transition the service to the + * {@link State#FAILED}. The service will <b>not be stopped</b> if it + * is running. Invoke this method when a service has failed critically or + * otherwise cannot be started nor stopped. */ protected final void notifyFailed(Throwable cause) { checkNotNull(cause); lock.lock(); try { - switch (snapshot.state) { - case NEW: - case TERMINATED: - throw new IllegalStateException("Failed while in state:" + snapshot.state, cause); - case RUNNING: - case STARTING: - case STOPPING: - State previous = snapshot.state; - snapshot = new StateSnapshot(State.FAILED, false, cause); - failed(previous, cause); - break; - case FAILED: - // Do nothing - break; - default: - throw new AssertionError("Unexpected state: " + snapshot.state); + if (state == State.STARTING) { + startup.setException(cause); + shutdown.setException(new Exception( + "Service failed to start.", cause)); + } else if (state == State.STOPPING) { + shutdown.setException(cause); + } else if (state == State.RUNNING) { + shutdown.setException(new Exception("Service failed while running", cause)); + } else if (state == State.NEW || state == State.TERMINATED) { + throw new IllegalStateException("Failed while in state:" + state, cause); } + state = State.FAILED; } finally { lock.unlock(); - executeListeners(); } } @@ -322,28 +223,12 @@ public abstract class AbstractService implements Service { @Override public final State state() { - return snapshot.externalState(); - } - - /** - * @since 14.0 - */ - @Override - public final Throwable failureCause() { - return snapshot.failureCause(); - } - - /** - * @since 13.0 - */ - @Override - public final void addListener(Listener listener, Executor executor) { - checkNotNull(listener, "listener"); - checkNotNull(executor, "executor"); lock.lock(); try { - if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) { - listeners.add(new ListenerExecutorPair(listener, executor)); + if (shutdownWhenStartupFinishes && state == State.STARTING) { + return State.STOPPING; + } else { + return state; } } finally { lock.unlock(); @@ -368,182 +253,4 @@ public abstract class AbstractService implements Service { } } } - - /** - * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the - * {@link #lock}. - */ - private void executeListeners() { - if (!lock.isHeldByCurrentThread()) { - synchronized (queuedListeners) { - Runnable listener; - while ((listener = queuedListeners.poll()) != null) { - listener.run(); - } - } - } - } - - @GuardedBy("lock") - private void starting() { - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.starting(); - } - }); - } - }); - } - } - - @GuardedBy("lock") - private void running() { - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.running(); - } - }); - } - }); - } - } - - @GuardedBy("lock") - private void stopping(final State from) { - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.stopping(from); - } - }); - } - }); - } - } - - @GuardedBy("lock") - private void terminated(final State from) { - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.terminated(from); - } - }); - } - }); - } - // There are no more state transitions so we can clear this out. - listeners.clear(); - } - - @GuardedBy("lock") - private void failed(final State from, final Throwable cause) { - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.failed(from, cause); - } - }); - } - }); - } - // There are no more state transitions so we can clear this out. - listeners.clear(); - } - - /** A simple holder for a listener and its executor. */ - private static class ListenerExecutorPair { - final Listener listener; - final Executor executor; - - ListenerExecutorPair(Listener listener, Executor executor) { - this.listener = listener; - this.executor = executor; - } - - /** - * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all - * exceptions - */ - void execute(Runnable runnable) { - try { - executor.execute(runnable); - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception while executing listener " + listener - + " with executor " + executor, e); - } - } - } - - /** - * An immutable snapshot of the current state of the service. This class represents a consistent - * snapshot of the state and therefore it can be used to answer simple queries without needing to - * grab a lock. - */ - @Immutable - private static final class StateSnapshot { - /** - * The internal state, which equals external state unless - * shutdownWhenStartupFinishes is true. - */ - final State state; - - /** - * If true, the user requested a shutdown while the service was still starting - * up. - */ - final boolean shutdownWhenStartupFinishes; - - /** - * The exception that caused this service to fail. This will be {@code null} - * unless the service has failed. - */ - @Nullable - final Throwable failure; - - StateSnapshot(State internalState) { - this(internalState, false, null); - } - - StateSnapshot( - State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { - checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING, - "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", - internalState); - checkArgument(!(failure != null ^ internalState == State.FAILED), - "A failure cause should be set if and only if the state is failed. Got %s and %s " - + "instead.", internalState, failure); - this.state = internalState; - this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; - this.failure = failure; - } - - /** @see Service#state() */ - State externalState() { - if (shutdownWhenStartupFinishes && state == State.STARTING) { - return State.STOPPING; - } else { - return state; - } - } - - /** @see Service#failureCause() */ - Throwable failureCause() { - checkState(state == State.FAILED, - "failureCause() is only valid if the service has failed, service is %s", state); - return failure; - } - } } diff --git a/guava/src/com/google/common/util/concurrent/AsyncFunction.java b/guava/src/com/google/common/util/concurrent/AsyncFunction.java index cdb1228..441c029 100644 --- a/guava/src/com/google/common/util/concurrent/AsyncFunction.java +++ b/guava/src/com/google/common/util/concurrent/AsyncFunction.java @@ -16,6 +16,8 @@ package com.google.common.util.concurrent; +import com.google.common.annotations.Beta; + import java.util.concurrent.Future; /** @@ -25,6 +27,7 @@ import java.util.concurrent.Future; * @author Chris Povirk * @since 11.0 */ +@Beta public interface AsyncFunction<I, O> { /** * Returns an output {@code Future} to use in place of the given {@code diff --git a/guava/src/com/google/common/util/concurrent/AtomicDouble.java b/guava/src/com/google/common/util/concurrent/AtomicDouble.java index d007f45..0f38fb9 100644 --- a/guava/src/com/google/common/util/concurrent/AtomicDouble.java +++ b/guava/src/com/google/common/util/concurrent/AtomicDouble.java @@ -14,9 +14,10 @@ package com.google.common.util.concurrent; +import com.google.common.annotations.Beta; + import static java.lang.Double.doubleToRawLongBits; import static java.lang.Double.longBitsToDouble; - import java.util.concurrent.atomic.AtomicLongFieldUpdater; /** @@ -42,16 +43,17 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; * * <p>It is possible to write a more scalable updater, at the cost of * giving up strict atomicity. See for example - * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleAdder.html"> - * DoubleAdder</a> + * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleAdder.html" + * DoubleAdder> * and - * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleMaxUpdater.html"> - * DoubleMaxUpdater</a>. + * <a href="http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166edocs/jsr166e/DoubleMaxUpdater.html" + * DoubleMaxUpdater>. * * @author Doug Lea * @author Martin Buchholz * @since 11.0 */ +@Beta public class AtomicDouble extends Number implements java.io.Serializable { private static final long serialVersionUID = 0L; diff --git a/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java b/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java index 407cd7c..37f4c5c 100644 --- a/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java +++ b/guava/src/com/google/common/util/concurrent/AtomicDoubleArray.java @@ -13,9 +13,10 @@ package com.google.common.util.concurrent; +import com.google.common.annotations.Beta; + import static java.lang.Double.doubleToRawLongBits; import static java.lang.Double.longBitsToDouble; - import java.util.concurrent.atomic.AtomicLongArray; /** @@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicLongArray; * @author Martin Buchholz * @since 11.0 */ +@Beta public class AtomicDoubleArray implements java.io.Serializable { private static final long serialVersionUID = 0L; diff --git a/guava/src/com/google/common/util/concurrent/AtomicLongMap.java b/guava/src/com/google/common/util/concurrent/AtomicLongMap.java index d0af965..c49f84c 100644 --- a/guava/src/com/google/common/util/concurrent/AtomicLongMap.java +++ b/guava/src/com/google/common/util/concurrent/AtomicLongMap.java @@ -1,24 +1,10 @@ -/* - * Copyright (C) 2011 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +// Copyright 2011 Google Inc. All Rights Reserved. package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.annotations.GwtCompatible; +import com.google.common.annotations.Beta; import com.google.common.base.Function; import com.google.common.collect.Maps; @@ -50,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; * @author Charles Fry * @since 11.0 */ -@GwtCompatible +@Beta public final class AtomicLongMap<K> { private final ConcurrentHashMap<K, AtomicLong> map; diff --git a/guava/src/com/google/common/util/concurrent/Atomics.java b/guava/src/com/google/common/util/concurrent/Atomics.java index efb7946..fece83d 100644 --- a/guava/src/com/google/common/util/concurrent/Atomics.java +++ b/guava/src/com/google/common/util/concurrent/Atomics.java @@ -16,6 +16,8 @@ package com.google.common.util.concurrent; +import com.google.common.annotations.Beta; + import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -28,6 +30,7 @@ import javax.annotation.Nullable; * @author Kurt Alfred Kluever * @since 10.0 */ +@Beta public final class Atomics { private Atomics() {} diff --git a/guava/src/com/google/common/util/concurrent/CycleDetectingLockFactory.java b/guava/src/com/google/common/util/concurrent/CycleDetectingLockFactory.java deleted file mode 100644 index 528fc8e..0000000 --- a/guava/src/com/google/common/util/concurrent/CycleDetectingLockFactory.java +++ /dev/null @@ -1,1038 +0,0 @@ -/* - * Copyright (C) 2011 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.common.util.concurrent; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.MapMaker; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.ThreadSafe; - -/** - * The {@code CycleDetectingLockFactory} creates {@link ReentrantLock} instances and - * {@link ReentrantReadWriteLock} instances that detect potential deadlock by checking - * for cycles in lock acquisition order. - * <p> - * Potential deadlocks detected when calling the {@code lock()}, - * {@code lockInterruptibly()}, or {@code tryLock()} methods will result in the - * execution of the {@link Policy} specified when creating the factory. The - * currently available policies are: - * <ul> - * <li>DISABLED - * <li>WARN - * <li>THROW - * </ul> - * The locks created by a factory instance will detect lock acquisition cycles - * with locks created by other {@code CycleDetectingLockFactory} instances - * (except those with {@code Policy.DISABLED}). A lock's behavior when a cycle - * is detected, however, is defined by the {@code Policy} of the factory that - * created it. This allows detection of cycles across components while - * delegating control over lock behavior to individual components. - * <p> - * Applications are encouraged to use a {@code CycleDetectingLockFactory} to - * create any locks for which external/unmanaged code is executed while the lock - * is held. (See caveats under <strong>Performance</strong>). - * <p> - * <strong>Cycle Detection</strong> - * <p> - * Deadlocks can arise when locks are acquired in an order that forms a cycle. - * In a simple example involving two locks and two threads, deadlock occurs - * when one thread acquires Lock A, and then Lock B, while another thread - * acquires Lock B, and then Lock A: - * <pre> - * Thread1: acquire(LockA) --X acquire(LockB) - * Thread2: acquire(LockB) --X acquire(LockA) - * </pre> - * Neither thread will progress because each is waiting for the other. In more - * complex applications, cycles can arise from interactions among more than 2 - * locks: - * <pre> - * Thread1: acquire(LockA) --X acquire(LockB) - * Thread2: acquire(LockB) --X acquire(LockC) - * ... - * ThreadN: acquire(LockN) --X acquire(LockA) - * </pre> - * The implementation detects cycles by constructing a directed graph in which - * each lock represents a node and each edge represents an acquisition ordering - * between two locks. - * <ul> - * <li>Each lock adds (and removes) itself to/from a ThreadLocal Set of acquired - * locks when the Thread acquires its first hold (and releases its last - * remaining hold). - * <li>Before the lock is acquired, the lock is checked against the current set - * of acquired locks---to each of the acquired locks, an edge from the - * soon-to-be-acquired lock is either verified or created. - * <li>If a new edge needs to be created, the outgoing edges of the acquired - * locks are traversed to check for a cycle that reaches the lock to be - * acquired. If no cycle is detected, a new "safe" edge is created. - * <li>If a cycle is detected, an "unsafe" (cyclic) edge is created to represent - * a potential deadlock situation, and the appropriate Policy is executed. - * </ul> - * Note that detection of potential deadlock does not necessarily indicate that - * deadlock will happen, as it is possible that higher level application logic - * prevents the cyclic lock acquisition from occurring. One example of a false - * positive is: - * <pre> - * LockA -> LockB -> LockC - * LockA -> LockC -> LockB - * </pre> - * - * <strong>ReadWriteLocks</strong> - * <p> - * While {@code ReadWriteLock} instances have different properties and can form cycles - * without potential deadlock, this class treats {@code ReadWriteLock} instances as - * equivalent to traditional exclusive locks. Although this increases the false - * positives that the locks detect (i.e. cycles that will not actually result in - * deadlock), it simplifies the algorithm and implementation considerably. The - * assumption is that a user of this factory wishes to eliminate any cyclic - * acquisition ordering. - * <p> - * <strong>Explicit Lock Acquisition Ordering</strong> - * <p> - * The {@link CycleDetectingLockFactory.WithExplicitOrdering} class can be used - * to enforce an application-specific ordering in addition to performing general - * cycle detection. - * <p> - * <strong>Garbage Collection</strong> - * <p> - * In order to allow proper garbage collection of unused locks, the edges of - * the lock graph are weak references. - * <p> - * <strong>Performance</strong> - * <p> - * The extra bookkeeping done by cycle detecting locks comes at some cost to - * performance. Benchmarks (as of December 2011) show that: - * - * <ul> - * <li>for an unnested {@code lock()} and {@code unlock()}, a cycle detecting - * lock takes 38ns as opposed to the 24ns taken by a plain lock. - * <li>for nested locking, the cost increases with the depth of the nesting: - * <ul> - * <li> 2 levels: average of 64ns per lock()/unlock() - * <li> 3 levels: average of 77ns per lock()/unlock() - * <li> 4 levels: average of 99ns per lock()/unlock() - * <li> 5 levels: average of 103ns per lock()/unlock() - * <li>10 levels: average of 184ns per lock()/unlock() - * <li>20 levels: average of 393ns per lock()/unlock() - * </ul> - * </ul> - * - * As such, the CycleDetectingLockFactory may not be suitable for - * performance-critical applications which involve tightly-looped or - * deeply-nested locking algorithms. - * - * @author Darick Tong - * @since 13.0 - */ -@Beta -@ThreadSafe -public class CycleDetectingLockFactory { - - /** - * Encapsulates the action to be taken when a potential deadlock is - * encountered. Clients can use one of the predefined {@link Policies} or - * specify a custom implementation. Implementations must be thread-safe. - * - * @since 13.0 - */ - @Beta - @ThreadSafe - public interface Policy { - - /** - * Called when a potential deadlock is encountered. Implementations can - * throw the given {@code exception} and/or execute other desired logic. - * <p> - * Note that the method will be called even upon an invocation of - * {@code tryLock()}. Although {@code tryLock()} technically recovers from - * deadlock by eventually timing out, this behavior is chosen based on the - * assumption that it is the application's wish to prohibit any cyclical - * lock acquisitions. - */ - void handlePotentialDeadlock(PotentialDeadlockException exception); - } - - /** - * Pre-defined {@link Policy} implementations. - * - * @since 13.0 - */ - @Beta - public enum Policies implements Policy { - /** - * When potential deadlock is detected, this policy results in the throwing - * of the {@code PotentialDeadlockException} indicating the potential - * deadlock, which includes stack traces illustrating the cycle in lock - * acquisition order. - */ - THROW { - @Override - public void handlePotentialDeadlock(PotentialDeadlockException e) { - throw e; - } - }, - - /** - * When potential deadlock is detected, this policy results in the logging - * of a {@link Level#SEVERE} message indicating the potential deadlock, - * which includes stack traces illustrating the cycle in lock acquisition - * order. - */ - WARN { - @Override - public void handlePotentialDeadlock(PotentialDeadlockException e) { - logger.log(Level.SEVERE, "Detected potential deadlock", e); - } - }, - - /** - * Disables cycle detection. This option causes the factory to return - * unmodified lock implementations provided by the JDK, and is provided to - * allow applications to easily parameterize when cycle detection is - * enabled. - * <p> - * Note that locks created by a factory with this policy will <em>not</em> - * participate the cycle detection performed by locks created by other - * factories. - */ - DISABLED { - @Override - public void handlePotentialDeadlock(PotentialDeadlockException e) { - } - }; - } - - /** - * Creates a new factory with the specified policy. - */ - public static CycleDetectingLockFactory newInstance(Policy policy) { - return new CycleDetectingLockFactory(policy); - } - - /** - * Equivalent to {@code newReentrantLock(lockName, false)}. - */ - public ReentrantLock newReentrantLock(String lockName) { - return newReentrantLock(lockName, false); - } - - /** - * Creates a {@link ReentrantLock} with the given fairness policy. The - * {@code lockName} is used in the warning or exception output to help - * identify the locks involved in the detected deadlock. - */ - public ReentrantLock newReentrantLock(String lockName, boolean fair) { - return policy == Policies.DISABLED ? new ReentrantLock(fair) - : new CycleDetectingReentrantLock( - new LockGraphNode(lockName), fair); - } - - /** - * Equivalent to {@code newReentrantReadWriteLock(lockName, false)}. - */ - public ReentrantReadWriteLock newReentrantReadWriteLock(String lockName) { - return newReentrantReadWriteLock(lockName, false); - } - - /** - * Creates a {@link ReentrantReadWriteLock} with the given fairness policy. - * The {@code lockName} is used in the warning or exception output to help - * identify the locks involved in the detected deadlock. - */ - public ReentrantReadWriteLock newReentrantReadWriteLock( - String lockName, boolean fair) { - return policy == Policies.DISABLED ? new ReentrantReadWriteLock(fair) - : new CycleDetectingReentrantReadWriteLock( - new LockGraphNode(lockName), fair); - } - - // A static mapping from an Enum type to its set of LockGraphNodes. - private static final Map<Class<? extends Enum>, - Map<? extends Enum, LockGraphNode>> lockGraphNodesPerType = - new MapMaker().weakKeys().makeComputingMap( - new OrderedLockGraphNodesCreator()); - - /** - * Creates a {@code CycleDetectingLockFactory.WithExplicitOrdering<E>}. - */ - public static <E extends Enum<E>> WithExplicitOrdering<E> - newInstanceWithExplicitOrdering(Class<E> enumClass, Policy policy) { - // OrderedLockGraphNodesCreator maps each enumClass to a Map with the - // corresponding enum key type. - checkNotNull(enumClass); - checkNotNull(policy); - @SuppressWarnings("unchecked") - Map<E, LockGraphNode> lockGraphNodes = - (Map<E, LockGraphNode>) lockGraphNodesPerType.get(enumClass); - return new WithExplicitOrdering<E>(policy, lockGraphNodes); - } - - /** - * A {@code CycleDetectingLockFactory.WithExplicitOrdering} provides the - * additional enforcement of an application-specified ordering of lock - * acquisitions. The application defines the allowed ordering with an - * {@code Enum} whose values each correspond to a lock type. The order in - * which the values are declared dictates the allowed order of lock - * acquisition. In other words, locks corresponding to smaller values of - * {@link Enum#ordinal()} should only be acquired before locks with larger - * ordinals. Example: - * - * <pre> {@code - * enum MyLockOrder { - * FIRST, SECOND, THIRD; - * } - * - * CycleDetectingLockFactory.WithExplicitOrdering<MyLockOrder> factory = - * CycleDetectingLockFactory.newInstanceWithExplicitOrdering(Policies.THROW); - * - * Lock lock1 = factory.newReentrantLock(MyLockOrder.FIRST); - * Lock lock2 = factory.newReentrantLock(MyLockOrder.SECOND); - * Lock lock3 = factory.newReentrantLock(MyLockOrder.THIRD); - * - * lock1.lock(); - * lock3.lock(); - * lock2.lock(); // will throw an IllegalStateException - * }</pre> - * - * As with all locks created by instances of {@code CycleDetectingLockFactory} - * explicitly ordered locks participate in general cycle detection with all - * other cycle detecting locks, and a lock's behavior when detecting a cyclic - * lock acquisition is defined by the {@code Policy} of the factory that - * created it. - * <p> - * Note, however, that although multiple locks can be created for a given Enum - * value, whether it be through separate factory instances or through multiple - * calls to the same factory, attempting to acquire multiple locks with the - * same Enum value (within the same thread) will result in an - * IllegalStateException regardless of the factory's policy. For example: - * - * <pre> {@code - * CycleDetectingLockFactory.WithExplicitOrdering<MyLockOrder> factory1 = - * CycleDetectingLockFactory.newInstanceWithExplicitOrdering(...); - * CycleDetectingLockFactory.WithExplicitOrdering<MyLockOrder> factory2 = - * CycleDetectingLockFactory.newInstanceWithExplicitOrdering(...); - * - * Lock lockA = factory1.newReentrantLock(MyLockOrder.FIRST); - * Lock lockB = factory1.newReentrantLock(MyLockOrder.FIRST); - * Lock lockC = factory2.newReentrantLock(MyLockOrder.FIRST); - * - * lockA.lock(); - * - * lockB.lock(); // will throw an IllegalStateException - * lockC.lock(); // will throw an IllegalStateException - * - * lockA.lock(); // reentrant acquisition is okay - * }</pre> - * - * It is the responsibility of the application to ensure that multiple lock - * instances with the same rank are never acquired in the same thread. - * - * @param <E> The Enum type representing the explicit lock ordering. - * @since 13.0 - */ - @Beta - public static final class WithExplicitOrdering<E extends Enum<E>> - extends CycleDetectingLockFactory { - - private final Map<E, LockGraphNode> lockGraphNodes; - - @VisibleForTesting - WithExplicitOrdering( - Policy policy, Map<E, LockGraphNode> lockGraphNodes) { - super(policy); - this.lockGraphNodes = lockGraphNodes; - } - - /** - * Equivalent to {@code newReentrantLock(rank, false)}. - */ - public ReentrantLock newReentrantLock(E rank) { - return newReentrantLock(rank, false); - } - - /** - * Creates a {@link ReentrantLock} with the given fairness policy and rank. - * The values returned by {@link Enum#getDeclaringClass()} and - * {@link Enum#name()} are used to describe the lock in warning or - * exception output. - * - * @throws IllegalStateException If the factory has already created a - * {@code Lock} with the specified rank. - */ - public ReentrantLock newReentrantLock(E rank, boolean fair) { - return policy == Policies.DISABLED ? new ReentrantLock(fair) - : new CycleDetectingReentrantLock(lockGraphNodes.get(rank), fair); - } - - /** - * Equivalent to {@code newReentrantReadWriteLock(rank, false)}. - */ - public ReentrantReadWriteLock newReentrantReadWriteLock(E rank) { - return newReentrantReadWriteLock(rank, false); - } - - /** - * Creates a {@link ReentrantReadWriteLock} with the given fairness policy - * and rank. The values returned by {@link Enum#getDeclaringClass()} and - * {@link Enum#name()} are used to describe the lock in warning or exception - * output. - * - * @throws IllegalStateException If the factory has already created a - * {@code Lock} with the specified rank. - */ - public ReentrantReadWriteLock newReentrantReadWriteLock( - E rank, boolean fair) { - return policy == Policies.DISABLED ? new ReentrantReadWriteLock(fair) - : new CycleDetectingReentrantReadWriteLock( - lockGraphNodes.get(rank), fair); - } - } - - /** - * For a given Enum type, creates an immutable map from each of the Enum's - * values to a corresponding LockGraphNode, with the - * {@code allowedPriorLocks} and {@code disallowedPriorLocks} prepopulated - * with nodes according to the natural ordering of the associated Enum values. - */ - @VisibleForTesting - static class OrderedLockGraphNodesCreator - implements Function<Class<? extends Enum>, - Map<? extends Enum, LockGraphNode>> { - - @Override - @SuppressWarnings("unchecked") // There's no way to properly express with - // wildcards the recursive Enum type required by createNodesFor(), and the - // Map/Function types must use wildcards since they accept any Enum class. - public Map<? extends Enum, LockGraphNode> apply( - Class<? extends Enum> clazz) { - return createNodesFor(clazz); - } - - <E extends Enum<E>> Map<E, LockGraphNode> createNodesFor(Class<E> clazz) { - EnumMap<E, LockGraphNode> map = Maps.newEnumMap(clazz); - E[] keys = clazz.getEnumConstants(); - final int numKeys = keys.length; - ArrayList<LockGraphNode> nodes = - Lists.newArrayListWithCapacity(numKeys); - // Create a LockGraphNode for each enum value. - for (E key : keys) { - LockGraphNode node = new LockGraphNode(getLockName(key)); - nodes.add(node); - map.put(key, node); - } - // Pre-populate all allowedPriorLocks with nodes of smaller ordinal. - for (int i = 1; i < numKeys; i++) { - nodes.get(i).checkAcquiredLocks(Policies.THROW, nodes.subList(0, i)); - } - // Pre-populate all disallowedPriorLocks with nodes of larger ordinal. - for (int i = 0; i < numKeys - 1; i++) { - nodes.get(i).checkAcquiredLocks( - Policies.DISABLED, nodes.subList(i + 1, numKeys)); - } - return Collections.unmodifiableMap(map); - } - - /** - * For the given Enum value {@code rank}, returns the value's - * {@code "EnumClass.name"}, which is used in exception and warning - * output. - */ - private String getLockName(Enum<?> rank) { - return rank.getDeclaringClass().getSimpleName() + "." + rank.name(); - } - } - - //////// Implementation ///////// - - private static final Logger logger = Logger.getLogger( - CycleDetectingLockFactory.class.getName()); - - final Policy policy; - - private CycleDetectingLockFactory(Policy policy) { - this.policy = checkNotNull(policy); - } - - /** - * Tracks the currently acquired locks for each Thread, kept up to date by - * calls to {@link #aboutToAcquire(CycleDetectingLock)} and - * {@link #lockStateChanged(CycleDetectingLock)}. - */ - // This is logically a Set, but an ArrayList is used to minimize the amount - // of allocation done on lock()/unlock(). - private static final ThreadLocal<ArrayList<LockGraphNode>> - acquiredLocks = new ThreadLocal<ArrayList<LockGraphNode>>() { - @Override - protected ArrayList<LockGraphNode> initialValue() { - return Lists.<LockGraphNode>newArrayListWithCapacity(3); - } - }; - - /** - * A Throwable used to record a stack trace that illustrates an example of - * a specific lock acquisition ordering. The top of the stack trace is - * truncated such that it starts with the acquisition of the lock in - * question, e.g. - * - * <pre> - * com...ExampleStackTrace: LockB -> LockC - * at com...CycleDetectingReentrantLock.lock(CycleDetectingLockFactory.java:443) - * at ... - * at ... - * at com...MyClass.someMethodThatAcquiresLockB(MyClass.java:123) - * </pre> - */ - private static class ExampleStackTrace extends IllegalStateException { - - static final StackTraceElement[] EMPTY_STACK_TRACE = - new StackTraceElement[0]; - - static Set<String> EXCLUDED_CLASS_NAMES = ImmutableSet.of( - CycleDetectingLockFactory.class.getName(), - ExampleStackTrace.class.getName(), - LockGraphNode.class.getName()); - - ExampleStackTrace(LockGraphNode node1, LockGraphNode node2) { - super(node1.getLockName() + " -> " + node2.getLockName()); - StackTraceElement[] origStackTrace = getStackTrace(); - for (int i = 0, n = origStackTrace.length; i < n; i++) { - if (WithExplicitOrdering.class.getName().equals( - origStackTrace[i].getClassName())) { - // For pre-populated disallowedPriorLocks edges, omit the stack trace. - setStackTrace(EMPTY_STACK_TRACE); - break; - } - if (!EXCLUDED_CLASS_NAMES.contains(origStackTrace[i].getClassName())) { - setStackTrace(Arrays.copyOfRange(origStackTrace, i, n)); - break; - } - } - } - } - - /** - * Represents a detected cycle in lock acquisition ordering. The exception - * includes a causal chain of {@code ExampleStackTrace} instances to illustrate the - * cycle, e.g. - * - * <pre> - * com....PotentialDeadlockException: Potential Deadlock from LockC -> ReadWriteA - * at ... - * at ... - * Caused by: com...ExampleStackTrace: LockB -> LockC - * at ... - * at ... - * Caused by: com...ExampleStackTrace: ReadWriteA -> LockB - * at ... - * at ... - * </pre> - * - * Instances are logged for the {@code Policies.WARN}, and thrown for - * {@code Policies.THROW}. - * - * @since 13.0 - */ - @Beta - public static final class PotentialDeadlockException - extends ExampleStackTrace { - - private final ExampleStackTrace conflictingStackTrace; - - private PotentialDeadlockException( - LockGraphNode node1, - LockGraphNode node2, - ExampleStackTrace conflictingStackTrace) { - super(node1, node2); - this.conflictingStackTrace = conflictingStackTrace; - initCause(conflictingStackTrace); - } - - public ExampleStackTrace getConflictingStackTrace() { - return conflictingStackTrace; - } - - /** - * Appends the chain of messages from the {@code conflictingStackTrace} to - * the original {@code message}. - */ - @Override - public String getMessage() { - StringBuilder message = new StringBuilder(super.getMessage()); - for (Throwable t = conflictingStackTrace; t != null; t = t.getCause()) { - message.append(", ").append(t.getMessage()); - } - return message.toString(); - } - } - - /** - * Internal Lock implementations implement the {@code CycleDetectingLock} - * interface, allowing the detection logic to treat all locks in the same - * manner. - */ - private interface CycleDetectingLock { - - /** @return the {@link LockGraphNode} associated with this lock. */ - LockGraphNode getLockGraphNode(); - - /** @return {@code true} if the current thread has acquired this lock. */ - boolean isAcquiredByCurrentThread(); - } - - /** - * A {@code LockGraphNode} associated with each lock instance keeps track of - * the directed edges in the lock acquisition graph. - */ - private static class LockGraphNode { - - /** - * The map tracking the locks that are known to be acquired before this - * lock, each associated with an example stack trace. Locks are weakly keyed - * to allow proper garbage collection when they are no longer referenced. - */ - final Map<LockGraphNode, ExampleStackTrace> allowedPriorLocks = - new MapMaker().weakKeys().makeMap(); - - /** - * The map tracking lock nodes that can cause a lock acquisition cycle if - * acquired before this node. - */ - final Map<LockGraphNode, PotentialDeadlockException> - disallowedPriorLocks = new MapMaker().weakKeys().makeMap(); - - final String lockName; - - LockGraphNode(String lockName) { - this.lockName = Preconditions.checkNotNull(lockName); - } - - String getLockName() { - return lockName; - } - - void checkAcquiredLocks( - Policy policy, List<LockGraphNode> acquiredLocks) { - for (int i = 0, size = acquiredLocks.size(); i < size; i++) { - checkAcquiredLock(policy, acquiredLocks.get(i)); - } - } - - /** - * Checks the acquisition-ordering between {@code this}, which is about to - * be acquired, and the specified {@code acquiredLock}. - * <p> - * When this method returns, the {@code acquiredLock} should be in either - * the {@code preAcquireLocks} map, for the case in which it is safe to - * acquire {@code this} after the {@code acquiredLock}, or in the - * {@code disallowedPriorLocks} map, in which case it is not safe. - */ - void checkAcquiredLock(Policy policy, LockGraphNode acquiredLock) { - // checkAcquiredLock() should never be invoked by a lock that has already - // been acquired. For unordered locks, aboutToAcquire() ensures this by - // checking isAcquiredByCurrentThread(). For ordered locks, however, this - // can happen because multiple locks may share the same LockGraphNode. In - // this situation, throw an IllegalStateException as defined by contract - // described in the documentation of WithExplicitOrdering. - Preconditions.checkState( - this != acquiredLock, - "Attempted to acquire multiple locks with the same rank " + - acquiredLock.getLockName()); - - if (allowedPriorLocks.containsKey(acquiredLock)) { - // The acquisition ordering from "acquiredLock" to "this" has already - // been verified as safe. In a properly written application, this is - // the common case. - return; - } - PotentialDeadlockException previousDeadlockException = - disallowedPriorLocks.get(acquiredLock); - if (previousDeadlockException != null) { - // Previously determined to be an unsafe lock acquisition. - // Create a new PotentialDeadlockException with the same causal chain - // (the example cycle) as that of the cached exception. - PotentialDeadlockException exception = new PotentialDeadlockException( - acquiredLock, this, - previousDeadlockException.getConflictingStackTrace()); - policy.handlePotentialDeadlock(exception); - return; - } - // Otherwise, it's the first time seeing this lock relationship. Look for - // a path from the acquiredLock to this. - Set<LockGraphNode> seen = Sets.newIdentityHashSet(); - ExampleStackTrace path = acquiredLock.findPathTo(this, seen); - - if (path == null) { - // this can be safely acquired after the acquiredLock. - // - // Note that there is a race condition here which can result in missing - // a cyclic edge: it's possible for two threads to simultaneous find - // "safe" edges which together form a cycle. Preventing this race - // condition efficiently without _introducing_ deadlock is probably - // tricky. For now, just accept the race condition---missing a warning - // now and then is still better than having no deadlock detection. - allowedPriorLocks.put( - acquiredLock, new ExampleStackTrace(acquiredLock, this)); - } else { - // Unsafe acquisition order detected. Create and cache a - // PotentialDeadlockException. - PotentialDeadlockException exception = - new PotentialDeadlockException(acquiredLock, this, path); - disallowedPriorLocks.put(acquiredLock, exception); - policy.handlePotentialDeadlock(exception); - } - } - - /** - * Performs a depth-first traversal of the graph edges defined by each - * node's {@code allowedPriorLocks} to find a path between {@code this} and - * the specified {@code lock}. - * - * @return If a path was found, a chained {@link ExampleStackTrace} - * illustrating the path to the {@code lock}, or {@code null} if no path - * was found. - */ - @Nullable - private ExampleStackTrace findPathTo( - LockGraphNode node, Set<LockGraphNode> seen) { - if (!seen.add(this)) { - return null; // Already traversed this node. - } - ExampleStackTrace found = allowedPriorLocks.get(node); - if (found != null) { - return found; // Found a path ending at the node! - } - // Recurse the edges. - for (Map.Entry<LockGraphNode, ExampleStackTrace> entry : - allowedPriorLocks.entrySet()) { - LockGraphNode preAcquiredLock = entry.getKey(); - found = preAcquiredLock.findPathTo(node, seen); - if (found != null) { - // One of this node's allowedPriorLocks found a path. Prepend an - // ExampleStackTrace(preAcquiredLock, this) to the returned chain of - // ExampleStackTraces. - ExampleStackTrace path = - new ExampleStackTrace(preAcquiredLock, this); - path.setStackTrace(entry.getValue().getStackTrace()); - path.initCause(found); - return path; - } - } - return null; - } - } - - /** - * CycleDetectingLock implementations must call this method before attempting - * to acquire the lock. - */ - private void aboutToAcquire(CycleDetectingLock lock) { - if (!lock.isAcquiredByCurrentThread()) { - ArrayList<LockGraphNode> acquiredLockList = acquiredLocks.get(); - LockGraphNode node = lock.getLockGraphNode(); - node.checkAcquiredLocks(policy, acquiredLockList); - acquiredLockList.add(node); - } - } - - /** - * CycleDetectingLock implementations must call this method in a - * {@code finally} clause after any attempt to change the lock state, - * including both lock and unlock attempts. Failure to do so can result in - * corrupting the acquireLocks set. - */ - private void lockStateChanged(CycleDetectingLock lock) { - if (!lock.isAcquiredByCurrentThread()) { - ArrayList<LockGraphNode> acquiredLockList = acquiredLocks.get(); - LockGraphNode node = lock.getLockGraphNode(); - // Iterate in reverse because locks are usually locked/unlocked in a - // LIFO order. - for (int i = acquiredLockList.size() - 1; i >=0; i--) { - if (acquiredLockList.get(i) == node) { - acquiredLockList.remove(i); - break; - } - } - } - } - - final class CycleDetectingReentrantLock - extends ReentrantLock implements CycleDetectingLock { - - private final LockGraphNode lockGraphNode; - - private CycleDetectingReentrantLock( - LockGraphNode lockGraphNode, boolean fair) { - super(fair); - this.lockGraphNode = Preconditions.checkNotNull(lockGraphNode); - } - - ///// CycleDetectingLock methods. ///// - - @Override - public LockGraphNode getLockGraphNode() { - return lockGraphNode; - } - - @Override - public boolean isAcquiredByCurrentThread() { - return isHeldByCurrentThread(); - } - - ///// Overridden ReentrantLock methods. ///// - - @Override - public void lock() { - aboutToAcquire(this); - try { - super.lock(); - } finally { - lockStateChanged(this); - } - } - - @Override - public void lockInterruptibly() throws InterruptedException { - aboutToAcquire(this); - try { - super.lockInterruptibly(); - } finally { - lockStateChanged(this); - } - } - - @Override - public boolean tryLock() { - aboutToAcquire(this); - try { - return super.tryLock(); - } finally { - lockStateChanged(this); - } - } - - @Override - public boolean tryLock(long timeout, TimeUnit unit) - throws InterruptedException { - aboutToAcquire(this); - try { - return super.tryLock(timeout, unit); - } finally { - lockStateChanged(this); - } - } - - @Override - public void unlock() { - try { - super.unlock(); - } finally { - lockStateChanged(this); - } - } - } - - final class CycleDetectingReentrantReadWriteLock - extends ReentrantReadWriteLock implements CycleDetectingLock { - - // These ReadLock/WriteLock implementations shadow those in the - // ReentrantReadWriteLock superclass. They are simply wrappers around the - // internal Sync object, so this is safe since the shadowed locks are never - // exposed or used. - private final CycleDetectingReentrantReadLock readLock; - private final CycleDetectingReentrantWriteLock writeLock; - - private final LockGraphNode lockGraphNode; - - private CycleDetectingReentrantReadWriteLock( - LockGraphNode lockGraphNode, boolean fair) { - super(fair); - this.readLock = new CycleDetectingReentrantReadLock(this); - this.writeLock = new CycleDetectingReentrantWriteLock(this); - this.lockGraphNode = Preconditions.checkNotNull(lockGraphNode); - } - - ///// Overridden ReentrantReadWriteLock methods. ///// - - @Override - public ReadLock readLock() { - return readLock; - } - - @Override - public WriteLock writeLock() { - return writeLock; - } - - ///// CycleDetectingLock methods. ///// - - @Override - public LockGraphNode getLockGraphNode() { - return lockGraphNode; - } - - @Override - public boolean isAcquiredByCurrentThread() { - return isWriteLockedByCurrentThread() || getReadHoldCount() > 0; - } - } - - private class CycleDetectingReentrantReadLock - extends ReentrantReadWriteLock.ReadLock { - - final CycleDetectingReentrantReadWriteLock readWriteLock; - - CycleDetectingReentrantReadLock( - CycleDetectingReentrantReadWriteLock readWriteLock) { - super(readWriteLock); - this.readWriteLock = readWriteLock; - } - - @Override - public void lock() { - aboutToAcquire(readWriteLock); - try { - super.lock(); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public void lockInterruptibly() throws InterruptedException { - aboutToAcquire(readWriteLock); - try { - super.lockInterruptibly(); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public boolean tryLock() { - aboutToAcquire(readWriteLock); - try { - return super.tryLock(); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public boolean tryLock(long timeout, TimeUnit unit) - throws InterruptedException { - aboutToAcquire(readWriteLock); - try { - return super.tryLock(timeout, unit); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public void unlock() { - try { - super.unlock(); - } finally { - lockStateChanged(readWriteLock); - } - } - } - - private class CycleDetectingReentrantWriteLock - extends ReentrantReadWriteLock.WriteLock { - - final CycleDetectingReentrantReadWriteLock readWriteLock; - - CycleDetectingReentrantWriteLock( - CycleDetectingReentrantReadWriteLock readWriteLock) { - super(readWriteLock); - this.readWriteLock = readWriteLock; - } - - @Override - public void lock() { - aboutToAcquire(readWriteLock); - try { - super.lock(); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public void lockInterruptibly() throws InterruptedException { - aboutToAcquire(readWriteLock); - try { - super.lockInterruptibly(); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public boolean tryLock() { - aboutToAcquire(readWriteLock); - try { - return super.tryLock(); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public boolean tryLock(long timeout, TimeUnit unit) - throws InterruptedException { - aboutToAcquire(readWriteLock); - try { - return super.tryLock(timeout, unit); - } finally { - lockStateChanged(readWriteLock); - } - } - - @Override - public void unlock() { - try { - super.unlock(); - } finally { - lockStateChanged(readWriteLock); - } - } - } -} diff --git a/guava/src/com/google/common/util/concurrent/ExecutionError.java b/guava/src/com/google/common/util/concurrent/ExecutionError.java index e462969..ce588eb 100644 --- a/guava/src/com/google/common/util/concurrent/ExecutionError.java +++ b/guava/src/com/google/common/util/concurrent/ExecutionError.java @@ -17,8 +17,7 @@ package com.google.common.util.concurrent; import com.google.common.annotations.GwtCompatible; - -import javax.annotation.Nullable; +import com.google.common.annotations.Beta; /** * {@link Error} variant of {@link java.util.concurrent.ExecutionException}. As @@ -32,6 +31,7 @@ import javax.annotation.Nullable; * @author Chris Povirk * @since 10.0 */ +@Beta @GwtCompatible public class ExecutionError extends Error { /** @@ -42,21 +42,21 @@ public class ExecutionError extends Error { /** * Creates a new instance with the given detail message. */ - protected ExecutionError(@Nullable String message) { + protected ExecutionError(String message) { super(message); } /** * Creates a new instance with the given detail message and cause. */ - public ExecutionError(@Nullable String message, @Nullable Error cause) { + public ExecutionError(String message, Error cause) { super(message, cause); } /** * Creates a new instance with the given cause. */ - public ExecutionError(@Nullable Error cause) { + public ExecutionError(Error cause) { super(cause); } diff --git a/guava/src/com/google/common/util/concurrent/ExecutionList.java b/guava/src/com/google/common/util/concurrent/ExecutionList.java index e1a40d0..d1b78f5 100644 --- a/guava/src/com/google/common/util/concurrent/ExecutionList.java +++ b/guava/src/com/google/common/util/concurrent/ExecutionList.java @@ -16,7 +16,6 @@ package com.google.common.util.concurrent; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -46,7 +45,7 @@ import java.util.logging.Logger; public final class ExecutionList { // Logger to log exceptions caught when running runnables. - @VisibleForTesting static final Logger log = + private static final Logger log = Logger.getLogger(ExecutionList.class.getName()); // The runnable,executor pairs to execute. diff --git a/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java b/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java index 75a1e94..890479d 100644 --- a/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java +++ b/guava/src/com/google/common/util/concurrent/FakeTimeLimiter.java @@ -16,8 +16,6 @@ package com.google.common.util.concurrent; -import static com.google.common.base.Preconditions.checkNotNull; - import com.google.common.annotations.Beta; import java.util.concurrent.Callable; @@ -38,16 +36,12 @@ public final class FakeTimeLimiter implements TimeLimiter { @Override public <T> T newProxy(T target, Class<T> interfaceType, long timeoutDuration, TimeUnit timeoutUnit) { - checkNotNull(target); - checkNotNull(interfaceType); - checkNotNull(timeoutUnit); return target; // ha ha } @Override public <T> T callWithTimeout(Callable<T> callable, long timeoutDuration, TimeUnit timeoutUnit, boolean amInterruptible) throws Exception { - checkNotNull(timeoutUnit); return callable.call(); // fooled you } } diff --git a/guava/src/com/google/common/util/concurrent/ForwardingService.java b/guava/src/com/google/common/util/concurrent/ForwardingService.java index 8774232..e39e4db 100644 --- a/guava/src/com/google/common/util/concurrent/ForwardingService.java +++ b/guava/src/com/google/common/util/concurrent/ForwardingService.java @@ -19,22 +19,13 @@ package com.google.common.util.concurrent; import com.google.common.annotations.Beta; import com.google.common.collect.ForwardingObject; -import java.util.concurrent.Executor; - /** * A {@link Service} that forwards all method calls to another service. * - * @deprecated Instead of using a {@link ForwardingService}, consider using the - * {@link Service.Listener} functionality to hook into the {@link Service} - * lifecycle, or if you really do need to provide access to some Service - * methods, consider just providing the few that you actually need (e.g. just - * {@link #startAndWait()}) and not implementing Service. - * * @author Chris Nokleberg * @since 1.0 */ @Beta -@Deprecated public abstract class ForwardingService extends ForwardingObject implements Service { @@ -66,20 +57,6 @@ public abstract class ForwardingService extends ForwardingObject @Override public boolean isRunning() { return delegate().isRunning(); } - - /** - * @since 13.0 - */ - @Override public void addListener(Listener listener, Executor executor) { - delegate().addListener(listener, executor); - } - - /** - * @since 14.0 - */ - @Override public Throwable failureCause() { - return delegate().failureCause(); - } /** * A sensible default implementation of {@link #startAndWait()}, in terms of diff --git a/guava/src/com/google/common/util/concurrent/FutureCallback.java b/guava/src/com/google/common/util/concurrent/FutureCallback.java index 735d6ab..7b39d4a 100644 --- a/guava/src/com/google/common/util/concurrent/FutureCallback.java +++ b/guava/src/com/google/common/util/concurrent/FutureCallback.java @@ -16,6 +16,8 @@ package com.google.common.util.concurrent; +import com.google.common.annotations.Beta; + import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -28,6 +30,7 @@ import java.util.concurrent.Future; * @author Anthony Zana * @since 10.0 */ +@Beta public interface FutureCallback<V> { /** * Invoked with the result of the {@code Future} computation when it is diff --git a/guava/src/com/google/common/util/concurrent/FutureFallback.java b/guava/src/com/google/common/util/concurrent/FutureFallback.java deleted file mode 100644 index 7d03c67..0000000 --- a/guava/src/com/google/common/util/concurrent/FutureFallback.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (C) 2011 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.common.util.concurrent; - -import com.google.common.annotations.Beta; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -/** - * Provides a backup {@code Future} to replace an earlier failed {@code Future}. - * An implementation of this interface can be applied to an input {@code Future} - * with {@link Futures#withFallback}. - * - * @param <V> the result type of the provided backup {@code Future} - * - * @author Bruno Diniz - * @since 14.0 - */ -@Beta -public interface FutureFallback<V> { - /** - * Returns a {@code Future} to be used in place of the {@code Future} that - * failed with the given exception. The exception is provided so that the - * {@code Fallback} implementation can conditionally determine whether to - * propagate the exception or to attempt to recover. - * - * @param t the exception that made the future fail. If the future's {@link - * Future#get() get} method throws an {@link ExecutionException}, then the - * cause is passed to this method. Any other thrown object is passed - * unaltered. - */ - ListenableFuture<V> create(Throwable t) throws Exception; -} diff --git a/guava/src/com/google/common/util/concurrent/Futures.java b/guava/src/com/google/common/util/concurrent/Futures.java index aad6b43..dc703c5 100644 --- a/guava/src/com/google/common/util/concurrent/Futures.java +++ b/guava/src/com/google/common/util/concurrent/Futures.java @@ -21,14 +21,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; +import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import com.google.common.annotations.Beta; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -38,27 +39,22 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; import javax.annotation.Nullable; /** * Static utility methods pertaining to the {@link Future} interface. * - * <p>Many of these methods use the {@link ListenableFuture} API; consult the - * Guava User Guide article on <a href= - * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> - * {@code ListenableFuture}</a>. - * * @author Kevin Bourrillion * @author Nishant Thakkar * @author Sven Mawson @@ -75,7 +71,7 @@ public final class Futures { * * <p>The given mapping function will be applied to an * {@link InterruptedException}, a {@link CancellationException}, or an - * {@link ExecutionException}. + * {@link ExecutionException} with the actual cause of the exception. * See {@link Future#get()} for details on the exceptions thrown. * * @since 9.0 (source-compatible since 1.0) @@ -85,151 +81,6 @@ public final class Futures { return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); } - private abstract static class ImmediateFuture<V> - implements ListenableFuture<V> { - - private static final Logger log = - Logger.getLogger(ImmediateFuture.class.getName()); - - @Override - public void addListener(Runnable listener, Executor executor) { - checkNotNull(listener, "Runnable was null."); - checkNotNull(executor, "Executor was null."); - try { - executor.execute(listener); - } catch (RuntimeException e) { - // ListenableFuture's contract is that it will not throw unchecked - // exceptions, so log the bad runnable and/or executor and swallow it. - log.log(Level.SEVERE, "RuntimeException while executing runnable " - + listener + " with executor " + executor, e); - } - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public abstract V get() throws ExecutionException; - - @Override - public V get(long timeout, TimeUnit unit) throws ExecutionException { - checkNotNull(unit); - return get(); - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - } - - private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { - - @Nullable private final V value; - - ImmediateSuccessfulFuture(@Nullable V value) { - this.value = value; - } - - @Override - public V get() { - return value; - } - } - - private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> - extends ImmediateFuture<V> implements CheckedFuture<V, X> { - - @Nullable private final V value; - - ImmediateSuccessfulCheckedFuture(@Nullable V value) { - this.value = value; - } - - @Override - public V get() { - return value; - } - - @Override - public V checkedGet() { - return value; - } - - @Override - public V checkedGet(long timeout, TimeUnit unit) { - checkNotNull(unit); - return value; - } - } - - private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { - - private final Throwable thrown; - - ImmediateFailedFuture(Throwable thrown) { - this.thrown = thrown; - } - - @Override - public V get() throws ExecutionException { - throw new ExecutionException(thrown); - } - } - - private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { - - private final CancellationException thrown; - - ImmediateCancelledFuture() { - this.thrown = new CancellationException("Immediate cancelled future."); - } - - @Override - public boolean isCancelled() { - return true; - } - - @Override - public V get() { - throw AbstractFuture.cancellationExceptionWithCause( - "Task was cancelled.", thrown); - } - } - - private static class ImmediateFailedCheckedFuture<V, X extends Exception> - extends ImmediateFuture<V> implements CheckedFuture<V, X> { - - private final X thrown; - - ImmediateFailedCheckedFuture(X thrown) { - this.thrown = thrown; - } - - @Override - public V get() throws ExecutionException { - throw new ExecutionException(thrown); - } - - @Override - public V checkedGet() throws X { - throw thrown; - } - - @Override - public V checkedGet(long timeout, TimeUnit unit) throws X { - checkNotNull(unit); - throw thrown; - } - } - /** * Creates a {@code ListenableFuture} which has its value set immediately upon * construction. The getters just return the value. This {@code Future} can't @@ -237,7 +88,9 @@ public final class Futures { * {@code true}. */ public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { - return new ImmediateSuccessfulFuture<V>(value); + SettableFuture<V> future = SettableFuture.create(); + future.set(value); + return future; } /** @@ -250,7 +103,14 @@ public final class Futures { */ public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture(@Nullable V value) { - return new ImmediateSuccessfulCheckedFuture<V, X>(value); + SettableFuture<V> future = SettableFuture.create(); + future.set(value); + return Futures.makeChecked(future, new Function<Exception, X>() { + @Override + public X apply(Exception e) { + throw new AssertionError("impossible"); + } + }); } /** @@ -261,21 +121,15 @@ public final class Futures { * method always returns {@code true}. Calling {@code get()} will immediately * throw the provided {@code Throwable} wrapped in an {@code * ExecutionException}. + * + * @throws Error if the throwable is an {@link Error}. */ public static <V> ListenableFuture<V> immediateFailedFuture( Throwable throwable) { checkNotNull(throwable); - return new ImmediateFailedFuture<V>(throwable); - } - - /** - * Creates a {@code ListenableFuture} which is cancelled immediately upon - * construction, so that {@code isCancelled()} always returns {@code true}. - * - * @since 14.0 - */ - public static <V> ListenableFuture<V> immediateCancelledFuture() { - return new ImmediateCancelledFuture<V>(); + SettableFuture<V> future = SettableFuture.create(); + future.setException(throwable); + return future; } /** @@ -284,224 +138,149 @@ public final class Futures { * * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} * method always returns {@code true}. Calling {@code get()} will immediately - * throw the provided {@code Exception} wrapped in an {@code + * throw the provided {@code Throwable} wrapped in an {@code * ExecutionException}, and calling {@code checkedGet()} will throw the * provided exception itself. + * + * @throws Error if the throwable is an {@link Error}. */ public static <V, X extends Exception> CheckedFuture<V, X> - immediateFailedCheckedFuture(X exception) { + immediateFailedCheckedFuture(final X exception) { checkNotNull(exception); - return new ImmediateFailedCheckedFuture<V, X>(exception); + return makeChecked(Futures.<V>immediateFailedFuture(exception), + new Function<Exception, X>() { + @Override + public X apply(Exception e) { + return exception; + } + }); } /** - * Returns a {@code Future} whose result is taken from the given primary - * {@code input} or, if the primary input fails, from the {@code Future} - * provided by the {@code fallback}. {@link FutureFallback#create} is not - * invoked until the primary input has failed, so if the primary input - * succeeds, it is never invoked. If, during the invocation of {@code - * fallback}, an exception is thrown, this exception is used as the result of - * the output {@code Future}. - * - * <p>Below is an example of a fallback that returns a default value if an - * exception occurs: - * - * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter in case an exception happens when - * // processing the RPC to fetch counters. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * // Returning "0" as the default for the counter when the - * // exception happens. - * return immediateFuture(0); - * } - * }); - * }</pre> - * - * The fallback can also choose to propagate the original exception when - * desired: + * <p>Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code Function} to the result of the original {@code + * Future}. Example: * * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter only in case the exception was a - * // TimeoutException. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * if (t instanceof TimeoutException) { - * return immediateFuture(0); - * } - * return immediateFailedFuture(t); + * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); + * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = + * new Function<RowKey, ListenableFuture<QueryResult>>() { + * public ListenableFuture<QueryResult> apply(RowKey rowKey) { + * return dataService.read(rowKey); * } - * }); + * }; + * ListenableFuture<QueryResult> queryFuture = + * chain(rowKeyFuture, queryFunction); * }</pre> * - * Note: If the derived {@code Future} is slow or heavyweight to create - * (whether the {@code Future} itself is slow or heavyweight to complete is - * irrelevant), consider {@linkplain #withFallback(ListenableFuture, - * FutureFallback, Executor) supplying an executor}. If you do not supply an - * executor, {@code withFallback} will use {@link - * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some - * caveats for heavier operations. For example, the call to {@code - * fallback.create} may run on an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code withFallback} - * is called, {@code withFallback} will call {@code fallback.create} inline. - * <li>If the input {@code Future} is not yet done, {@code withFallback} will - * schedule {@code fallback.create} to be run by the thread that completes - * the input {@code Future}, which may be an internal system thread such as - * an RPC network thread. - * </ul> + * <p>Note: This overload of {@code chain} is designed for cases in which the + * work of creating the derived future is fast and lightweight, as the method + * does not accept an {@code Executor} in which to perform the the work. For + * heavier derivations, this overload carries some caveats: First, the thread + * that the derivation runs in depends on whether the input {@code Future} is + * done at the time {@code chain} is called. In particular, if called late, + * {@code chain} will run the derivation in the thread that called {@code + * chain}. Second, derivations may run in an internal thread of the system + * responsible for the input {@code Future}, such as an RPC network thread. + * Finally, during the execution of a {@code sameThreadExecutor} {@code + * chain} function, all other registered but unexecuted listeners are + * prevented from running, even if those listeners are to run in other + * executors. * - * Also note that, regardless of which thread executes {@code - * fallback.create}, all other registered but unexecuted listeners are - * prevented from running during its execution, even if those listeners are - * to run in other executors. + * <p>The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * chain function. That is, if the returned {@code Future} is cancelled, it + * will attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. * - * @param input the primary input {@code Future} - * @param fallback the {@link FutureFallback} implementation to be called if - * {@code input} fails - * @since 14.0 + * @param input The future to chain + * @param function A function to chain the results of the provided future + * to the results of the returned future. This will be run in the thread + * that notifies input it is complete. + * @return A future that holds result of the chain. + * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and + * use {@link #transform(ListenableFuture, AsyncFunction)}. This method is + * scheduled to be removed from Guava in Guava release 12.0. */ - public static <V> ListenableFuture<V> withFallback( - ListenableFuture<? extends V> input, - FutureFallback<? extends V> fallback) { - return withFallback(input, fallback, sameThreadExecutor()); + @Deprecated + public static <I, O> ListenableFuture<O> chain( + ListenableFuture<I> input, + Function<? super I, ? extends ListenableFuture<? extends O>> function) { + return chain(input, function, MoreExecutors.sameThreadExecutor()); } /** - * Returns a {@code Future} whose result is taken from the given primary - * {@code input} or, if the primary input fails, from the {@code Future} - * provided by the {@code fallback}. {@link FutureFallback#create} is not - * invoked until the primary input has failed, so if the primary input - * succeeds, it is never invoked. If, during the invocation of {@code - * fallback}, an exception is thrown, this exception is used as the result of - * the output {@code Future}. - * - * <p>Below is an example of a fallback that returns a default value if an - * exception occurs: + * <p>Returns a new {@code ListenableFuture} whose result is asynchronously + * derived from the result of the given {@code Future}. More precisely, the + * returned {@code Future} takes its result from a {@code Future} produced by + * applying the given {@code Function} to the result of the original {@code + * Future}. Example: * * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter in case an exception happens when - * // processing the RPC to fetch counters. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * // Returning "0" as the default for the counter when the - * // exception happens. - * return immediateFuture(0); + * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); + * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = + * new Function<RowKey, ListenableFuture<QueryResult>>() { + * public ListenableFuture<QueryResult> apply(RowKey rowKey) { + * return dataService.read(rowKey); * } - * }, sameThreadExecutor()); + * }; + * ListenableFuture<QueryResult> queryFuture = + * chain(rowKeyFuture, queryFunction, executor); * }</pre> * - * The fallback can also choose to propagate the original exception when - * desired: - * - * <pre> {@code - * ListenableFuture<Integer> fetchCounterFuture = ...; - * - * // Falling back to a zero counter only in case the exception was a - * // TimeoutException. - * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( - * fetchCounterFuture, new FutureFallback<Integer>() { - * public ListenableFuture<Integer> create(Throwable t) { - * if (t instanceof TimeoutException) { - * return immediateFuture(0); - * } - * return immediateFailedFuture(t); - * } - * }, sameThreadExecutor()); - * }</pre> + * <p>The returned {@code Future} attempts to keep its cancellation state in + * sync with that of the input future and that of the future returned by the + * chain function. That is, if the returned {@code Future} is cancelled, it + * will attempt to cancel the other two, and if either of the other two is + * cancelled, the returned {@code Future} will receive a callback in which it + * will attempt to cancel itself. * - * When the execution of {@code fallback.create} is fast and lightweight - * (though the {@code Future} it returns need not meet these criteria), - * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) - * omitting the executor} or explicitly specifying {@code - * sameThreadExecutor}. However, be aware of the caveats documented in the - * link above. - * - * @param input the primary input {@code Future} - * @param fallback the {@link FutureFallback} implementation to be called if - * {@code input} fails - * @param executor the executor that runs {@code fallback} if {@code input} - * fails - * @since 14.0 - */ - public static <V> ListenableFuture<V> withFallback( - ListenableFuture<? extends V> input, - FutureFallback<? extends V> fallback, Executor executor) { - checkNotNull(fallback); - return new FallbackFuture<V>(input, fallback, executor); - } - - /** - * A future that falls back on a second, generated future, in case its - * original future fails. + * <p>Note: For cases in which the work of creating the derived future is + * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture, + * Function) the other overload} or explicit use of {@code + * sameThreadExecutor}. For heavier derivations, this choice carries some + * caveats: First, the thread that the derivation runs in depends on whether + * the input {@code Future} is done at the time {@code chain} is called. In + * particular, if called late, {@code chain} will run the derivation in the + * thread that called {@code chain}. Second, derivations may run in an + * internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} {@code chain} function, all other registered but + * unexecuted listeners are prevented from running, even if those listeners + * are to run in other executors. + * + * @param input The future to chain + * @param function A function to chain the results of the provided future + * to the results of the returned future. + * @param executor Executor to run the function in. + * @return A future that holds result of the chain. + * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and + * use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This + * method is scheduled to be removed from Guava in Guava release 12.0. */ - private static class FallbackFuture<V> extends AbstractFuture<V> { - - private volatile ListenableFuture<? extends V> running; - - FallbackFuture(ListenableFuture<? extends V> input, - final FutureFallback<? extends V> fallback, - final Executor executor) { - running = input; - addCallback(running, new FutureCallback<V>() { - @Override - public void onSuccess(V value) { - set(value); - } - - @Override - public void onFailure(Throwable t) { - if (isCancelled()) { - return; - } - try { - running = fallback.create(t); - if (isCancelled()) { // in case cancel called in the meantime - running.cancel(wasInterrupted()); - return; - } - addCallback(running, new FutureCallback<V>() { - @Override - public void onSuccess(V value) { - set(value); - } - - @Override - public void onFailure(Throwable t) { - if (running.isCancelled()) { - cancel(false); - } else { - setException(t); - } - } - }, sameThreadExecutor()); - } catch (Exception e) { - setException(e); - } catch (Error e) { - setException(e); // note: rethrows + @Deprecated + public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, + final Function<? super I, ? extends ListenableFuture<? extends O>> + function, + Executor executor) { + checkNotNull(function); + ChainingListenableFuture<I, O> chain = + new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() { + @Override + /* + * All methods of ListenableFuture are covariant, and we don't expose + * the object anywhere that would allow it to be downcast. + */ + @SuppressWarnings("unchecked") + public ListenableFuture<O> apply(I input) { + return (ListenableFuture) function.apply(input); } - } - }, executor); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (super.cancel(mayInterruptIfRunning)) { - running.cancel(mayInterruptIfRunning); - return true; - } - return false; - } + }, input); + input.addListener(chain, executor); + return chain; } /** @@ -523,28 +302,20 @@ public final class Futures { * transform(rowKeyFuture, queryFunction); * }</pre> * - * Note: If the derived {@code Future} is slow or heavyweight to create - * (whether the {@code Future} itself is slow or heavyweight to complete is - * irrelevant), consider {@linkplain #transform(ListenableFuture, - * AsyncFunction, Executor) supplying an executor}. If you do not supply an - * executor, {@code transform} will use {@link - * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some - * caveats for heavier operations. For example, the call to {@code - * function.apply} may run on an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code transform} is - * called, {@code transform} will call {@code function.apply} inline. - * <li>If the input {@code Future} is not yet done, {@code transform} will - * schedule {@code function.apply} to be run by the thread that completes the - * input {@code Future}, which may be an internal system thread such as an - * RPC network thread. - * </ul> - * - * Also note that, regardless of which thread executes {@code - * function.apply}, all other registered but unexecuted listeners are - * prevented from running during its execution, even if those listeners are - * to run in other executors. + * <p>Note: This overload of {@code transform} is designed for cases in which + * the work of creating the derived {@code Future} is fast and lightweight, + * as the method does not accept an {@code Executor} in which to perform the + * the work. (The created {@code Future} itself need not complete quickly.) + * For heavier operations, this overload carries some caveats: First, the + * thread that {@code function.apply} runs in depends on whether the input + * {@code Future} is done at the time {@code transform} is called. In + * particular, if called late, {@code transform} will run the operation in + * the thread that called {@code transform}. Second, {@code function.apply} + * may run in an internal thread of the system responsible for the input + * {@code Future}, such as an RPC network thread. Finally, during the + * execution of a {@code sameThreadExecutor} {@code function.apply}, all + * other registered but unexecuted listeners are prevented from running, even + * if those listeners are to run in other executors. * * <p>The returned {@code Future} attempts to keep its cancellation state in * sync with that of the input future and that of the future returned by the @@ -591,11 +362,20 @@ public final class Futures { * cancelled, the returned {@code Future} will receive a callback in which it * will attempt to cancel itself. * - * <p>When the execution of {@code function.apply} is fast and lightweight - * (though the {@code Future} it returns need not meet these criteria), - * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting - * the executor} or explicitly specifying {@code sameThreadExecutor}. - * However, be aware of the caveats documented in the link above. + * <p>Note: For cases in which the work of creating the derived future is + * fast and lightweight, consider {@linkplain + * Futures#transform(ListenableFuture, Function) the other overload} or + * explicit use of {@code sameThreadExecutor}. For heavier derivations, this + * choice carries some caveats: First, the thread that {@code function.apply} + * runs in depends on whether the input {@code Future} is done at the time + * {@code transform} is called. In particular, if called late, {@code + * transform} will run the operation in the thread that called {@code + * transform}. Second, {@code function.apply} may run in an internal thread + * of the system responsible for the input {@code Future}, such as an RPC + * network thread. Finally, during the execution of a {@code + * sameThreadExecutor} {@code function.apply}, all other registered but + * unexecuted listeners are prevented from running, even if those listeners + * are to run in other executors. * * @param input The future to transform * @param function A function to transform the result of the input future @@ -631,26 +411,19 @@ public final class Futures { * transform(queryFuture, rowsFunction); * }</pre> * - * Note: If the transformation is slow or heavyweight, consider {@linkplain - * #transform(ListenableFuture, Function, Executor) supplying an executor}. - * If you do not supply an executor, {@code transform} will use {@link - * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some - * caveats for heavier operations. For example, the call to {@code - * function.apply} may run on an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code transform} is - * called, {@code transform} will call {@code function.apply} inline. - * <li>If the input {@code Future} is not yet done, {@code transform} will - * schedule {@code function.apply} to be run by the thread that completes the - * input {@code Future}, which may be an internal system thread such as an - * RPC network thread. - * </ul> - * - * Also note that, regardless of which thread executes {@code - * function.apply}, all other registered but unexecuted listeners are - * prevented from running during its execution, even if those listeners are - * to run in other executors. + * <p>Note: This overload of {@code transform} is designed for cases in which + * the transformation is fast and lightweight, as the method does not accept + * an {@code Executor} in which to perform the the work. For heavier + * transformations, this overload carries some caveats: First, the thread + * that the transformation runs in depends on whether the input {@code + * Future} is done at the time {@code transform} is called. In particular, if + * called late, {@code transform} will perform the transformation in the + * thread that called {@code transform}. Second, transformations may run in + * an internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} transformation, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. * * <p>The returned {@code Future} attempts to keep its cancellation state in * sync with that of the input future. That is, if the returned {@code Future} @@ -661,16 +434,16 @@ public final class Futures { * <p>An example use of this method is to convert a serializable object * returned from an RPC into a POJO. * - * @param input The future to transform + * @param future The future to transform * @param function A Function to transform the results of the provided future * to the results of the returned future. This will be run in the thread * that notifies input it is complete. * @return A future that holds result of the transformation. * @since 9.0 (in 1.0 as {@code compose}) */ - public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, + public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, final Function<? super I, ? extends O> function) { - return transform(input, function, MoreExecutors.sameThreadExecutor()); + return transform(future, function, MoreExecutors.sameThreadExecutor()); } /** @@ -699,29 +472,39 @@ public final class Futures { * <p>An example use of this method is to convert a serializable object * returned from an RPC into a POJO. * - * <p>When the transformation is fast and lightweight, consider {@linkplain - * #transform(ListenableFuture, Function) omitting the executor} or - * explicitly specifying {@code sameThreadExecutor}. However, be aware of the - * caveats documented in the link above. - * - * @param input The future to transform + * <p>Note: For cases in which the transformation is fast and lightweight, + * consider {@linkplain Futures#transform(ListenableFuture, Function) the + * other overload} or explicit use of {@link + * MoreExecutors#sameThreadExecutor}. For heavier transformations, this + * choice carries some caveats: First, the thread that the transformation + * runs in depends on whether the input {@code Future} is done at the time + * {@code transform} is called. In particular, if called late, {@code + * transform} will perform the transformation in the thread that called + * {@code transform}. Second, transformations may run in an internal thread + * of the system responsible for the input {@code Future}, such as an RPC + * network thread. Finally, during the execution of a {@code + * sameThreadExecutor} transformation, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. + * + * @param future The future to transform * @param function A Function to transform the results of the provided future * to the results of the returned future. * @param executor Executor to run the function in. * @return A future that holds result of the transformation. * @since 9.0 (in 2.0 as {@code compose}) */ - public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, + public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, final Function<? super I, ? extends O> function, Executor executor) { checkNotNull(function); - AsyncFunction<I, O> wrapperFunction - = new AsyncFunction<I, O>() { + Function<I, ListenableFuture<O>> wrapperFunction + = new Function<I, ListenableFuture<O>>() { @Override public ListenableFuture<O> apply(I input) { O output = function.apply(input); return immediateFuture(output); } }; - return transform(input, wrapperFunction, executor); + return chain(future, wrapperFunction, executor); } /** @@ -741,43 +524,43 @@ public final class Futures { * who don't have a {@code ListenableFuture} available and * do not mind repeated, lazy function evaluation. * - * @param input The future to transform + * @param future The future to transform * @param function A Function to transform the results of the provided future * to the results of the returned future. * @return A future that returns the result of the transformation. * @since 10.0 */ @Beta - public static <I, O> Future<O> lazyTransform(final Future<I> input, + public static <I, O> Future<O> lazyTransform(final Future<I> future, final Function<? super I, ? extends O> function) { - checkNotNull(input); + checkNotNull(future); checkNotNull(function); return new Future<O>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { - return input.cancel(mayInterruptIfRunning); + return future.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { - return input.isCancelled(); + return future.isCancelled(); } @Override public boolean isDone() { - return input.isDone(); + return future.isDone(); } @Override public O get() throws InterruptedException, ExecutionException { - return applyTransformation(input.get()); + return applyTransformation(future.get()); } @Override public O get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return applyTransformation(input.get(timeout, unit)); + return applyTransformation(future.get(timeout, unit)); } private O applyTransformation(I input) throws ExecutionException { @@ -806,6 +589,8 @@ public final class Futures { private AsyncFunction<? super I, ? extends O> function; private ListenableFuture<? extends I> inputFuture; private volatile ListenableFuture<? extends O> outputFuture; + private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = + new LinkedBlockingQueue<Boolean>(1); private final CountDownLatch outputCreated = new CountDownLatch(1); private ChainingListenableFuture( @@ -815,6 +600,90 @@ public final class Futures { this.inputFuture = checkNotNull(inputFuture); } + /** + * Delegate the get() to the input and output futures, in case + * their implementations defer starting computation until their + * own get() is invoked. + */ + @Override + public O get() throws InterruptedException, ExecutionException { + if (!isDone()) { + // Invoking get on the inputFuture will ensure our own run() + // method below is invoked as a listener when inputFuture sets + // its value. Therefore when get() returns we should then see + // the outputFuture be created. + ListenableFuture<? extends I> inputFuture = this.inputFuture; + if (inputFuture != null) { + inputFuture.get(); + } + + // If our listener was scheduled to run on an executor we may + // need to wait for our listener to finish running before the + // outputFuture has been constructed by the function. + outputCreated.await(); + + // Like above with the inputFuture, we have a listener on + // the outputFuture that will set our own value when its + // value is set. Invoking get will ensure the output can + // complete and invoke our listener, so that we can later + // get the result. + ListenableFuture<? extends O> outputFuture = this.outputFuture; + if (outputFuture != null) { + outputFuture.get(); + } + } + return super.get(); + } + + /** + * Delegate the get() to the input and output futures, in case + * their implementations defer starting computation until their + * own get() is invoked. + */ + @Override + public O get(long timeout, TimeUnit unit) throws TimeoutException, + ExecutionException, InterruptedException { + if (!isDone()) { + // Use a single time unit so we can decrease remaining timeout + // as we wait for various phases to complete. + if (unit != NANOSECONDS) { + timeout = NANOSECONDS.convert(timeout, unit); + unit = NANOSECONDS; + } + + // Invoking get on the inputFuture will ensure our own run() + // method below is invoked as a listener when inputFuture sets + // its value. Therefore when get() returns we should then see + // the outputFuture be created. + ListenableFuture<? extends I> inputFuture = this.inputFuture; + if (inputFuture != null) { + long start = System.nanoTime(); + inputFuture.get(timeout, unit); + timeout -= Math.max(0, System.nanoTime() - start); + } + + // If our listener was scheduled to run on an executor we may + // need to wait for our listener to finish running before the + // outputFuture has been constructed by the function. + long start = System.nanoTime(); + if (!outputCreated.await(timeout, unit)) { + throw new TimeoutException(); + } + timeout -= Math.max(0, System.nanoTime() - start); + + // Like above with the inputFuture, we have a listener on + // the outputFuture that will set our own value when its + // value is set. Invoking get will ensure the output can + // complete and invoke our listener, so that we can later + // get the result. + ListenableFuture<? extends O> outputFuture = this.outputFuture; + if (outputFuture != null) { + outputFuture.get(timeout, unit); + } + } + return super.get(timeout, unit); + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { /* @@ -824,6 +693,7 @@ public final class Futures { if (super.cancel(mayInterruptIfRunning)) { // This should never block since only one thread is allowed to cancel // this Future. + putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning); cancel(inputFuture, mayInterruptIfRunning); cancel(outputFuture, mayInterruptIfRunning); return true; @@ -859,7 +729,13 @@ public final class Futures { final ListenableFuture<? extends O> outputFuture = this.outputFuture = function.apply(sourceResult); if (isCancelled()) { - outputFuture.cancel(wasInterrupted()); + // Handles the case where cancel was called while the function was + // being applied. + // There is a gap in cancel(boolean) between calling sync.cancel() + // and storing the value of mayInterruptIfRunning, so this thread + // needs to block, waiting for that value. + outputFuture.cancel( + takeUninterruptibly(mayInterruptIfRunningChannel)); this.outputFuture = null; return; } @@ -907,52 +783,14 @@ public final class Futures { } /** - * Returns a new {@code ListenableFuture} whose result is the product of - * calling {@code get()} on the {@code Future} nested within the given {@code - * Future}, effectively chaining the futures one after the other. Example: - * - * <pre> {@code - * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); - * ListenableFuture<String> dereferenced = dereference(nested); - * }</pre> - * - * <p>This call has the same cancellation and execution semantics as {@link - * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code - * Future} attempts to keep its cancellation state in sync with both the - * input {@code Future} and the nested {@code Future}. The transformation - * is very lightweight and therefore takes place in the thread that called - * {@code dereference}. - * - * @param nested The nested future to transform. - * @return A future that holds result of the inner future. - * @since 13.0 - */ - @Beta - @SuppressWarnings({"rawtypes", "unchecked"}) - public static <V> ListenableFuture<V> dereference( - ListenableFuture<? extends ListenableFuture<? extends V>> nested) { - return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); - } - - /** - * Helper {@code Function} for {@link #dereference}. - */ - private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = - new AsyncFunction<ListenableFuture<Object>, Object>() { - @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { - return input; - } - }; - - /** * Creates a new {@code ListenableFuture} whose value is a list containing the * values of all its input futures, if all succeed. If any input fails, the * returned future fails. * * <p>The list of results is in the same order as the input list. * - * <p>Canceling this future will attempt to cancel all the component futures, - * and if any of the provided futures fails or is canceled, this one is, + * <p>Canceling this future does not cancel any of the component futures; + * however, if any of the provided futures fails or is canceled, this one is, * too. * * @param futures futures to combine @@ -963,7 +801,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> allAsList( ListenableFuture<? extends V>... futures) { - return listFuture(ImmutableList.copyOf(futures), true, + return new ListFuture<V>(ImmutableList.copyOf(futures), true, MoreExecutors.sameThreadExecutor()); } @@ -974,8 +812,8 @@ public final class Futures { * * <p>The list of results is in the same order as the input list. * - * <p>Canceling this future will attempt to cancel all the component futures, - * and if any of the provided futures fails or is canceled, this one is, + * <p>Canceling this future does not cancel any of the component futures; + * however, if any of the provided futures fails or is canceled, this one is, * too. * * @param futures futures to combine @@ -986,7 +824,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> allAsList( Iterable<? extends ListenableFuture<? extends V>> futures) { - return listFuture(ImmutableList.copyOf(futures), true, + return new ListFuture<V>(ImmutableList.copyOf(futures), true, MoreExecutors.sameThreadExecutor()); } @@ -998,8 +836,6 @@ public final class Futures { * indistinguishable from the future having a successful value of * {@code null}). * - * <p>Canceling this future will attempt to cancel all the component futures. - * * @param futures futures to combine * @return a future that provides a list of the results of the component * futures @@ -1008,7 +844,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> successfulAsList( ListenableFuture<? extends V>... futures) { - return listFuture(ImmutableList.copyOf(futures), false, + return new ListFuture<V>(ImmutableList.copyOf(futures), false, MoreExecutors.sameThreadExecutor()); } @@ -1020,8 +856,6 @@ public final class Futures { * indistinguishable from the future having a successful value of * {@code null}). * - * <p>Canceling this future will attempt to cancel all the component futures. - * * @param futures futures to combine * @return a future that provides a list of the results of the component * futures @@ -1030,7 +864,7 @@ public final class Futures { @Beta public static <V> ListenableFuture<List<V>> successfulAsList( Iterable<? extends ListenableFuture<? extends V>> futures) { - return listFuture(ImmutableList.copyOf(futures), false, + return new ListFuture<V>(ImmutableList.copyOf(futures), false, MoreExecutors.sameThreadExecutor()); } @@ -1055,26 +889,19 @@ public final class Futures { * } * });}</pre> * - * Note: If the callback is slow or heavyweight, consider {@linkplain - * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an - * executor}. If you do not supply an executor, {@code addCallback} will use - * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries - * some caveats for heavier operations. For example, the callback may run on - * an unpredictable or undesirable thread: - * - * <ul> - * <li>If the input {@code Future} is done at the time {@code addCallback} is - * called, {@code addCallback} will execute the callback inline. - * <li>If the input {@code Future} is not yet done, {@code addCallback} will - * schedule the callback to be run by the thread that completes the input - * {@code Future}, which may be an internal system thread such as an RPC - * network thread. - * </ul> - * - * Also note that, regardless of which thread executes the callback, all - * other registered but unexecuted listeners are prevented from running - * during its execution, even if those listeners are to run in other - * executors. + * <p>Note: This overload of {@code addCallback} is designed for cases in + * which the callack is fast and lightweight, as the method does not accept + * an {@code Executor} in which to perform the the work. For heavier + * callbacks, this overload carries some caveats: First, the thread that the + * callback runs in depends on whether the input {@code Future} is done at the + * time {@code addCallback} is called and on whether the input {@code Future} + * is ever cancelled. In particular, {@code addCallback} may execute the + * callback in the thread that calls {@code addCallback} or {@code + * Future.cancel}. Second, callbacks may run in an internal thread of the + * system responsible for the input {@code Future}, such as an RPC network + * thread. Finally, during the execution of a {@code sameThreadExecutor} + * callback, all other registered but unexecuted listeners are prevented from + * running, even if those listeners are to run in other executors. * * <p>For a more general interface to attach a completion listener to a * {@code Future}, see {@link ListenableFuture#addListener addListener}. @@ -1111,10 +938,20 @@ public final class Futures { * } * });}</pre> * - * When the callback is fast and lightweight, consider {@linkplain - * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or - * explicitly specifying {@code sameThreadExecutor}. However, be aware of the - * caveats documented in the link above. + * When the callback is fast and lightweight consider {@linkplain + * Futures#addCallback(ListenableFuture, FutureCallback) the other overload} + * or explicit use of {@link MoreExecutors#sameThreadExecutor + * sameThreadExecutor}. For heavier callbacks, this choice carries some + * caveats: First, the thread that the callback runs in depends on whether + * the input {@code Future} is done at the time {@code addCallback} is called + * and on whether the input {@code Future} is ever cancelled. In particular, + * {@code addCallback} may execute the callback in the thread that calls + * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in + * an internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor} callback, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. * * <p>For a more general interface to attach a completion listener to a * {@code Future}, see {@link ListenableFuture#addListener addListener}. @@ -1131,22 +968,18 @@ public final class Futures { Runnable callbackListener = new Runnable() { @Override public void run() { - final V value; try { // TODO(user): (Before Guava release), validate that this // is the thing for IE. - value = getUninterruptibly(future); + V value = getUninterruptibly(future); + callback.onSuccess(value); } catch (ExecutionException e) { callback.onFailure(e.getCause()); - return; } catch (RuntimeException e) { callback.onFailure(e); - return; } catch (Error e) { callback.onFailure(e); - return; } - callback.onSuccess(value); } }; future.addListener(callbackListener, executor); @@ -1434,53 +1267,49 @@ public final class Futures { } } - private interface FutureCombiner<V, C> { - C combine(List<Optional<V>> values); - } - - private static class CombinedFuture<V, C> extends AbstractFuture<C> { - ImmutableCollection<? extends ListenableFuture<? extends V>> futures; + /** + * Class that implements {@link #allAsList} and {@link #successfulAsList}. + * The idea is to create a (null-filled) List and register a listener with + * each component future to fill out the value in the List when that future + * completes. + */ + private static class ListFuture<V> extends AbstractFuture<List<V>> { + ImmutableList<? extends ListenableFuture<? extends V>> futures; final boolean allMustSucceed; final AtomicInteger remaining; - FutureCombiner<V, C> combiner; - List<Optional<V>> values; + List<V> values; - CombinedFuture( - ImmutableCollection<? extends ListenableFuture<? extends V>> futures, - boolean allMustSucceed, Executor listenerExecutor, - FutureCombiner<V, C> combiner) { + /** + * Constructor. + * + * @param futures all the futures to build the list from + * @param allMustSucceed whether a single failure or cancellation should + * propagate to this future + * @param listenerExecutor used to run listeners on all the passed in + * futures. + */ + ListFuture( + final ImmutableList<? extends ListenableFuture<? extends V>> futures, + final boolean allMustSucceed, final Executor listenerExecutor) { this.futures = futures; + this.values = Lists.newArrayListWithCapacity(futures.size()); this.allMustSucceed = allMustSucceed; this.remaining = new AtomicInteger(futures.size()); - this.combiner = combiner; - this.values = Lists.newArrayListWithCapacity(futures.size()); + init(listenerExecutor); } - /** - * Must be called at the end of the constructor. - */ - protected void init(final Executor listenerExecutor) { + private void init(final Executor listenerExecutor) { // First, schedule cleanup to execute when the Future is done. addListener(new Runnable() { @Override public void run() { - // Cancel all the component futures. - if (CombinedFuture.this.isCancelled()) { - for (ListenableFuture<?> future : CombinedFuture.this.futures) { - future.cancel(CombinedFuture.this.wasInterrupted()); - } - } - // By now the values array has either been set as the Future's value, // or (in case of failure) is no longer useful. - CombinedFuture.this.futures = null; + ListFuture.this.values = null; // Let go of the memory held by other futures - CombinedFuture.this.values = null; - - // The combiner may also hold state, so free that as well - CombinedFuture.this.combiner = null; + ListFuture.this.futures = null; } }, MoreExecutors.sameThreadExecutor()); @@ -1488,7 +1317,7 @@ public final class Futures { // Corner case: List is empty. if (futures.isEmpty()) { - set(combiner.combine(ImmutableList.<Optional<V>>of())); + set(Lists.newArrayList(values)); return; } @@ -1503,11 +1332,11 @@ public final class Futures { // this loop, the last call to addListener() will callback to // setOneValue(), transitively call our cleanup listener, and set // this.futures to null. - // This is not actually a problem, since the foreach only needs - // this.futures to be non-null at the beginning of the loop. - int i = 0; - for (final ListenableFuture<? extends V> listenable : futures) { - final int index = i++; + // We store a reference to futures to avoid the NPE. + ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures; + for (int i = 0; i < localFutures.size(); i++) { + final ListenableFuture<? extends V> listenable = localFutures.get(i); + final int index = i; listenable.addListener(new Runnable() { @Override public void run() { @@ -1521,12 +1350,12 @@ public final class Futures { * Sets the value at the given index to that of the given future. */ private void setOneValue(int index, Future<? extends V> future) { - List<Optional<V>> localValues = values; + List<V> localValues = values; if (isDone() || localValues == null) { // Some other future failed or has been cancelled, causing this one to // also be cancelled or have an exception set. This should only happen - // if allMustSucceed is true or if the output itself has been cancelled. - checkState(allMustSucceed || isCancelled(), + // if allMustSucceed is true. + checkState(allMustSucceed, "Future was done before all dependencies completed"); return; } @@ -1534,8 +1363,7 @@ public final class Futures { try { checkState(future.isDone(), "Tried to set value from future which is not done"); - V returnValue = getUninterruptibly(future); - localValues.set(index, Optional.fromNullable(returnValue)); + localValues.set(index, getUninterruptibly(future)); } catch (CancellationException e) { if (allMustSucceed) { // Set ourselves as cancelled. Let the input futures keep running @@ -1561,9 +1389,9 @@ public final class Futures { int newRemaining = remaining.decrementAndGet(); checkState(newRemaining >= 0, "Less than 0 remaining futures"); if (newRemaining == 0) { - FutureCombiner<V, C> localCombiner = combiner; - if (localCombiner != null) { - set(localCombiner.combine(localValues)); + localValues = values; + if (localValues != null) { + set(Lists.newArrayList(localValues)); } else { checkState(isDone()); } @@ -1571,25 +1399,46 @@ public final class Futures { } } - } + @Override + public List<V> get() throws InterruptedException, ExecutionException { + callAllGets(); - /** Used for {@link #allAsList} and {@link #successfulAsList}. */ - private static <V> ListenableFuture<List<V>> listFuture( - ImmutableList<ListenableFuture<? extends V>> futures, - boolean allMustSucceed, Executor listenerExecutor) { - return new CombinedFuture<V, List<V>>( - futures, allMustSucceed, listenerExecutor, - new FutureCombiner<V, List<V>>() { - @Override - public List<V> combine(List<Optional<V>> values) { - List<V> result = Lists.newArrayList(); - for (Optional<V> element : values) { - result.add(element != null ? element.orNull() : null); + // This may still block in spite of the calls above, as the listeners may + // be scheduled for execution in other threads. + return super.get(); + } + + /** + * Calls the get method of all dependency futures to work around a bug in + * some ListenableFutures where the listeners aren't called until get() is + * called. + */ + private void callAllGets() throws InterruptedException { + List<? extends ListenableFuture<? extends V>> oldFutures = futures; + if (oldFutures != null && !isDone()) { + for (ListenableFuture<? extends V> future : oldFutures) { + // We wait for a little while for the future, but if it's not done, + // we check that no other futures caused a cancellation or failure. + // This can introduce a delay of up to 10ms in reporting an exception. + while (!future.isDone()) { + try { + future.get(); + } catch (Error e) { + throw e; + } catch (InterruptedException e) { + throw e; + } catch (Throwable e) { + // ExecutionException / CancellationException / RuntimeException + if (allMustSucceed) { + return; + } else { + continue; + } } - // TODO(user): This should ultimately return an unmodifiableList - return result; } - }); + } + } + } } /** diff --git a/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java b/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java index 645a648..6d74bda 100644 --- a/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java +++ b/guava/src/com/google/common/util/concurrent/JdkFutureAdapters.java @@ -19,6 +19,7 @@ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -41,7 +42,7 @@ public final class JdkFutureAdapters { * Assigns a thread to the given {@link Future} to provide {@link * ListenableFuture} functionality. * - * <p><b>Warning:</b> If the input future does not already implement {@code + * <p><b>Warning:</b> If the input future does not already implement {@link * ListenableFuture}, the returned future will emulate {@link * ListenableFuture#addListener} by taking a thread from an internal, * unbounded pool at the first call to {@code addListener} and holding it @@ -56,40 +57,17 @@ public final class JdkFutureAdapters { */ public static <V> ListenableFuture<V> listenInPoolThread( Future<V> future) { - if (future instanceof ListenableFuture) { + if (future instanceof ListenableFuture<?>) { return (ListenableFuture<V>) future; } return new ListenableFutureAdapter<V>(future); } - /** - * Submits a blocking task for the given {@link Future} to provide {@link - * ListenableFuture} functionality. - * - * <p><b>Warning:</b> If the input future does not already implement {@code - * ListenableFuture}, the returned future will emulate {@link - * ListenableFuture#addListener} by submitting a task to the given executor at - * at the first call to {@code addListener}. The task must be started by the - * executor promptly, or else the returned {@code ListenableFuture} may fail - * to work. The task's execution consists of blocking until the input future - * is {@linkplain Future#isDone() done}, so each call to this method may - * claim and hold a thread for an arbitrary length of time. Use of bounded - * executors or other executors that may fail to execute a task promptly may - * result in deadlocks. - * - * <p>Prefer to create {@code ListenableFuture} instances with {@link - * SettableFuture}, {@link MoreExecutors#listeningDecorator( - * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask}, - * {@link AbstractFuture}, and other utilities over creating plain {@code - * Future} instances to be upgraded to {@code ListenableFuture} after the - * fact. - * - * @since 12.0 - */ - public static <V> ListenableFuture<V> listenInPoolThread( + @VisibleForTesting + static <V> ListenableFuture<V> listenInPoolThread( Future<V> future, Executor executor) { checkNotNull(executor); - if (future instanceof ListenableFuture) { + if (future instanceof ListenableFuture<?>) { return (ListenableFuture<V>) future; } return new ListenableFutureAdapter<V>(future, executor); diff --git a/guava/src/com/google/common/util/concurrent/ListenableFuture.java b/guava/src/com/google/common/util/concurrent/ListenableFuture.java index eb05354..a0ab2db 100644 --- a/guava/src/com/google/common/util/concurrent/ListenableFuture.java +++ b/guava/src/com/google/common/util/concurrent/ListenableFuture.java @@ -28,10 +28,6 @@ import java.util.concurrent.RejectedExecutionException; * computation is {@linkplain Future#isDone() complete}. If the computation has * already completed when the listener is added, the listener will execute * immediately. - * - * <p>See the Guava User Guide article on <a href= - * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> - * {@code ListenableFuture}</a>. * * <h3>Purpose</h3> * @@ -102,28 +98,20 @@ public interface ListenableFuture<V> extends Future<V> { * * <p>Note: For fast, lightweight listeners that would be safe to execute in * any thread, consider {@link MoreExecutors#sameThreadExecutor}. For heavier - * listeners, {@code sameThreadExecutor()} carries some caveats. For - * example, the listener may run on an unpredictable or undesirable thread: + * listeners, {@code sameThreadExecutor()} carries some caveats: First, the + * thread that the listener runs in depends on whether the {@code Future} is + * done at the time it is added and on whether it is ever canclled. In + * particular, listeners may run in the thread that calls {@code addListener} + * or the thread that calls {@code cancel}. Second, listeners may run in an + * internal thread of the system responsible for the input {@code Future}, + * such as an RPC network thread. Finally, during the execution of a {@code + * sameThreadExecutor()} listener, all other registered but unexecuted + * listeners are prevented from running, even if those listeners are to run + * in other executors. * - * <ul> - * <li>If the input {@code Future} is done at the time {@code addListener} is - * called, {@code addListener} will execute the listener inline. - * <li>If the input {@code Future} is not yet done, {@code addListener} will - * schedule the listener to be run by the thread that completes the input - * {@code Future}, which may be an internal system thread such as an RPC - * network thread. - * </ul> - * - * Also note that, regardless of which thread executes the listener, all - * other registered but unexecuted listeners are prevented from running - * during its execution, even if those listeners are to run in other - * executors. - * - * <p>This is the most general listener interface. For common operations - * performed using listeners, see {@link - * com.google.common.util.concurrent.Futures}. For a simplified but general - * listener interface, see {@link - * com.google.common.util.concurrent.Futures#addCallback addCallback()}. + * <p>This is the most general listener interface. + * For common operations performed using listeners, + * see {@link com.google.common.util.concurrent.Futures} * * @param listener the listener to run when the computation is complete * @param executor the executor to run the listener in diff --git a/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java b/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java index 35d6f13..474635c 100644 --- a/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java +++ b/guava/src/com/google/common/util/concurrent/ListenableFutureTask.java @@ -27,17 +27,12 @@ import javax.annotation.Nullable; * interface. Unlike {@code FutureTask}, {@code ListenableFutureTask} does not * provide an overrideable {@link FutureTask#done() done()} method. For similar * functionality, call {@link #addListener}. - * - * <p> * * @author Sven Mawson * @since 1.0 */ -public class ListenableFutureTask<V> extends FutureTask<V> +public final class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> { - // TODO(cpovirk): explore ways of making ListenableFutureTask final. There are - // some valid reasons such as BoundedQueueExecutorService to allow extends but it - // would be nice to make it final to avoid unintended usage. // The execution list to hold our listeners. private final ExecutionList executionList = new ExecutionList(); @@ -70,11 +65,11 @@ public class ListenableFutureTask<V> extends FutureTask<V> return new ListenableFutureTask<V>(runnable, result); } - ListenableFutureTask(Callable<V> callable) { + private ListenableFutureTask(Callable<V> callable) { super(callable); } - ListenableFutureTask(Runnable runnable, @Nullable V result) { + private ListenableFutureTask(Runnable runnable, @Nullable V result) { super(runnable, result); } diff --git a/guava/src/com/google/common/util/concurrent/MoreExecutors.java b/guava/src/com/google/common/util/concurrent/MoreExecutors.java index bd94db7..915b96d 100644 --- a/guava/src/com/google/common/util/concurrent/MoreExecutors.java +++ b/guava/src/com/google/common/util/concurrent/MoreExecutors.java @@ -16,26 +16,15 @@ package com.google.common.util.concurrent; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; -import java.lang.reflect.InvocationTargetException; -import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -44,7 +33,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -79,8 +67,16 @@ public final class MoreExecutors { @Beta public static ExecutorService getExitingExecutorService( ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { - return new Application() - .getExitingExecutorService(executor, terminationTimeout, timeUnit); + executor.setThreadFactory(new ThreadFactoryBuilder() + .setDaemon(true) + .setThreadFactory(executor.getThreadFactory()) + .build()); + + ExecutorService service = Executors.unconfigurableExecutorService(executor); + + addDelayedShutdownHook(service, terminationTimeout, timeUnit); + + return service; } /** @@ -101,9 +97,19 @@ public final class MoreExecutors { */ @Beta public static ScheduledExecutorService getExitingScheduledExecutorService( - ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { - return new Application() - .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit); + ScheduledThreadPoolExecutor executor, long terminationTimeout, + TimeUnit timeUnit) { + executor.setThreadFactory(new ThreadFactoryBuilder() + .setDaemon(true) + .setThreadFactory(executor.getThreadFactory()) + .build()); + + ScheduledExecutorService service = + Executors.unconfigurableScheduledExecutorService(executor); + + addDelayedShutdownHook(service, terminationTimeout, timeUnit); + + return service; } /** @@ -119,9 +125,24 @@ public final class MoreExecutors { */ @Beta public static void addDelayedShutdownHook( - ExecutorService service, long terminationTimeout, TimeUnit timeUnit) { - new Application() - .addDelayedShutdownHook(service, terminationTimeout, timeUnit); + final ExecutorService service, final long terminationTimeout, + final TimeUnit timeUnit) { + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + try { + // We'd like to log progress and failures that may arise in the + // following code, but unfortunately the behavior of logging + // is undefined in shutdown hooks. + // This is because the logging code installs a shutdown hook of its + // own. See Cleaner class inside {@link LogManager}. + service.shutdown(); + service.awaitTermination(terminationTimeout, timeUnit); + } catch (InterruptedException ignored) { + // We're shutting down anyway, so just ignore. + } + } + })); } /** @@ -140,8 +161,9 @@ public final class MoreExecutors { * @return an unmodifiable version of the input which will not hang the JVM */ @Beta - public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) { - return new Application().getExitingExecutorService(executor); + public static ExecutorService getExitingExecutorService( + ThreadPoolExecutor executor) { + return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); } /** @@ -162,69 +184,7 @@ public final class MoreExecutors { @Beta public static ScheduledExecutorService getExitingScheduledExecutorService( ScheduledThreadPoolExecutor executor) { - return new Application().getExitingScheduledExecutorService(executor); - } - - /** Represents the current application to register shutdown hooks. */ - @VisibleForTesting static class Application { - - final ExecutorService getExitingExecutorService( - ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { - useDaemonThreadFactory(executor); - ExecutorService service = Executors.unconfigurableExecutorService(executor); - addDelayedShutdownHook(service, terminationTimeout, timeUnit); - return service; - } - - final ScheduledExecutorService getExitingScheduledExecutorService( - ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { - useDaemonThreadFactory(executor); - ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor); - addDelayedShutdownHook(service, terminationTimeout, timeUnit); - return service; - } - - final void addDelayedShutdownHook( - final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) { - checkNotNull(service); - checkNotNull(timeUnit); - addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() { - @Override - public void run() { - try { - // We'd like to log progress and failures that may arise in the - // following code, but unfortunately the behavior of logging - // is undefined in shutdown hooks. - // This is because the logging code installs a shutdown hook of its - // own. See Cleaner class inside {@link LogManager}. - service.shutdown(); - service.awaitTermination(terminationTimeout, timeUnit); - } catch (InterruptedException ignored) { - // We're shutting down anyway, so just ignore. - } - } - })); - } - - final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) { - return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); - } - - final ScheduledExecutorService getExitingScheduledExecutorService( - ScheduledThreadPoolExecutor executor) { - return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); - } - - @VisibleForTesting void addShutdownHook(Thread hook) { - Runtime.getRuntime().addShutdownHook(hook); - } - } - - private static void useDaemonThreadFactory(ThreadPoolExecutor executor) { - executor.setThreadFactory(new ThreadFactoryBuilder() - .setDaemon(true) - .setThreadFactory(executor.getThreadFactory()) - .build()); + return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); } /** @@ -483,7 +443,6 @@ public final class MoreExecutors { private static class ScheduledListeningDecorator extends ListeningDecorator implements ListeningScheduledExecutorService { - @SuppressWarnings("hiding") final ScheduledExecutorService delegate; ScheduledListeningDecorator(ScheduledExecutorService delegate) { @@ -516,172 +475,4 @@ public final class MoreExecutors { command, initialDelay, delay, unit); } } - - /* - * This following method is a modified version of one found in - * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30 - * which contained the following notice: - * - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - * Other contributors include Andrew Wright, Jeffrey Hayes, - * Pat Fisher, Mike Judd. - */ - - /** - * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService} - * implementations. - */ static <T> T invokeAnyImpl(ListeningExecutorService executorService, - Collection<? extends Callable<T>> tasks, boolean timed, long nanos) - throws InterruptedException, ExecutionException, TimeoutException { - checkNotNull(executorService); - int ntasks = tasks.size(); - checkArgument(ntasks > 0); - List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks); - BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue(); - - // For efficiency, especially in executors with limited - // parallelism, check to see if previously submitted tasks are - // done before submitting more of them. This interleaving - // plus the exception mechanics account for messiness of main - // loop. - - try { - // Record exceptions so that if we fail to obtain any - // result, we can throw the last exception we got. - ExecutionException ee = null; - long lastTime = timed ? System.nanoTime() : 0; - Iterator<? extends Callable<T>> it = tasks.iterator(); - - futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue)); - --ntasks; - int active = 1; - - for (;;) { - Future<T> f = futureQueue.poll(); - if (f == null) { - if (ntasks > 0) { - --ntasks; - futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue)); - ++active; - } else if (active == 0) { - break; - } else if (timed) { - f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS); - if (f == null) { - throw new TimeoutException(); - } - long now = System.nanoTime(); - nanos -= now - lastTime; - lastTime = now; - } else { - f = futureQueue.take(); - } - } - if (f != null) { - --active; - try { - return f.get(); - } catch (ExecutionException eex) { - ee = eex; - } catch (RuntimeException rex) { - ee = new ExecutionException(rex); - } - } - } - - if (ee == null) { - ee = new ExecutionException(null); - } - throw ee; - } finally { - for (Future<T> f : futures) { - f.cancel(true); - } - } - } - - /** - * Submits the task and adds a listener that adds the future to {@code queue} when it completes. - */ - private static <T> ListenableFuture<T> submitAndAddQueueListener( - ListeningExecutorService executorService, Callable<T> task, - final BlockingQueue<Future<T>> queue) { - final ListenableFuture<T> future = executorService.submit(task); - future.addListener(new Runnable() { - @Override public void run() { - queue.add(future); - } - }, MoreExecutors.sameThreadExecutor()); - return future; - } - - /** - * Returns a default thread factory used to create new threads. - * - * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}. - * Otherwise, returns {@link Executors#defaultThreadFactory()}. - * - * @since 14.0 - */ - @Beta - public static ThreadFactory platformThreadFactory() { - if (!isAppEngine()) { - return Executors.defaultThreadFactory(); - } - try { - return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager") - .getMethod("currentRequestThreadFactory") - .invoke(null); - } catch (IllegalAccessException e) { - throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e); - } catch (NoSuchMethodException e) { - throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e); - } catch (InvocationTargetException e) { - throw Throwables.propagate(e.getCause()); - } - } - - private static boolean isAppEngine() { - if (System.getProperty("com.google.appengine.runtime.environment") == null) { - return false; - } - try { - // If the current environment is null, we're not inside AppEngine. - return Class.forName("com.google.apphosting.api.ApiProxy") - .getMethod("getCurrentEnvironment") - .invoke(null) != null; - } catch (ClassNotFoundException e) { - // If ApiProxy doesn't exist, we're not on AppEngine at all. - return false; - } catch (InvocationTargetException e) { - // If ApiProxy throws an exception, we're not in a proper AppEngine environment. - return false; - } catch (IllegalAccessException e) { - // If the method isn't accessible, we're not on a supported version of AppEngine; - return false; - } catch (NoSuchMethodException e) { - // If the method doesn't exist, we're not on a supported version of AppEngine; - return false; - } - } - - /** - * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name} - * unless changing the name is forbidden by the security manager. - */ - static Thread newThread(String name, Runnable runnable) { - checkNotNull(name); - checkNotNull(runnable); - Thread result = platformThreadFactory().newThread(runnable); - try { - result.setName(name); - } catch (SecurityException e) { - // OK if we can't set the name in this environment. - } - return result; - } } diff --git a/guava/src/com/google/common/util/concurrent/RateLimiter.java b/guava/src/com/google/common/util/concurrent/RateLimiter.java deleted file mode 100644 index 4085654..0000000 --- a/guava/src/com/google/common/util/concurrent/RateLimiter.java +++ /dev/null @@ -1,690 +0,0 @@ -/* - * Copyright (C) 2012 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.common.util.concurrent; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Ticker; - -import java.util.concurrent.TimeUnit; - -import javax.annotation.concurrent.ThreadSafe; - -/** - * A rate limiter. Conceptually, a rate limiter distributes permits at a - * configurable rate. Each {@link #acquire()} blocks if necessary until a permit is - * available, and then takes it. Once acquired, permits need not be released. - * - * <p>Rate limiters are often used to restrict the rate at which some - * physical or logical resource is accessed. This is in contrast to {@link - * java.util.concurrent.Semaphore} which restricts the number of concurrent - * accesses instead of the rate (note though that concurrency and rate are closely related, - * e.g. see <a href="http://en.wikipedia.org/wiki/Little's_law">Little's Law</a>). - * - * <p>A {@code RateLimiter} is defined primarily by the rate at which permits - * are issued. Absent additional configuration, permits will be distributed at a - * fixed rate, defined in terms of permits per second. Permits will be distributed - * smoothly, with the delay between individual permits being adjusted to ensure - * that the configured rate is maintained. - * - * <p>It is possible to configure a {@code RateLimiter} to have a warmup - * period during which time the permits issued each second steadily increases until - * it hits the stable rate. - * - * <p>As an example, imagine that we have a list of tasks to execute, but we don't want to - * submit more than 2 per second: - *<pre> {@code - * final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second" - * void submitTasks(List<Runnable> tasks, Executor executor) { - * for (Runnable task : tasks) { - * rateLimiter.acquire(); // may wait - * executor.execute(task); - * } - * } - *}</pre> - * - * <p>As another example, imagine that we produce a stream of data, and we want to cap it - * at 5kb per second. This could be accomplished by requiring a permit per byte, and specifying - * a rate of 5000 permits per second: - *<pre> {@code - * final RateLimiter rateLimiter = RateLimiter.create(5000.0); // rate = 5000 permits per second - * void submitPacket(byte[] packet) { - * rateLimiter.acquire(packet.length); - * networkService.send(packet); - * } - *}</pre> - * - * <p>It is important to note that the number of permits requested <i>never</i> - * affect the throttling of the request itself (an invocation to {@code acquire(1)} - * and an invocation to {@code acquire(1000)} will result in exactly the same throttling, if any), - * but it affects the throttling of the <i>next</i> request. I.e., if an expensive task - * arrives at an idle RateLimiter, it will be granted immediately, but it is the <i>next</i> - * request that will experience extra throttling, thus paying for the cost of the expensive - * task. - * - * <p>Note: {@code RateLimiter} does not provide fairness guarantees. - * - * @author Dimitris Andreou - * @since 13.0 - */ -// TODO(user): switch to nano precision. A natural unit of cost is "bytes", and a micro precision -// would mean a maximum rate of "1MB/s", which might be small in some cases. -@ThreadSafe -@Beta -public abstract class RateLimiter { - /* - * How is the RateLimiter designed, and why? - * - * The primary feature of a RateLimiter is its "stable rate", the maximum rate that - * is should allow at normal conditions. This is enforced by "throttling" incoming - * requests as needed, i.e. compute, for an incoming request, the appropriate throttle time, - * and make the calling thread wait as much. - * - * The simplest way to maintain a rate of QPS is to keep the timestamp of the last - * granted request, and ensure that (1/QPS) seconds have elapsed since then. For example, - * for a rate of QPS=5 (5 tokens per second), if we ensure that a request isn't granted - * earlier than 200ms after the the last one, then we achieve the intended rate. - * If a request comes and the last request was granted only 100ms ago, then we wait for - * another 100ms. At this rate, serving 15 fresh permits (i.e. for an acquire(15) request) - * naturally takes 3 seconds. - * - * It is important to realize that such a RateLimiter has a very superficial memory - * of the past: it only remembers the last request. What if the RateLimiter was unused for - * a long period of time, then a request arrived and was immediately granted? - * This RateLimiter would immediately forget about that past underutilization. This may - * result in either underutilization or overflow, depending on the real world consequences - * of not using the expected rate. - * - * Past underutilization could mean that excess resources are available. Then, the RateLimiter - * should speed up for a while, to take advantage of these resources. This is important - * when the rate is applied to networking (limiting bandwidth), where past underutilization - * typically translates to "almost empty buffers", which can be filled immediately. - * - * On the other hand, past underutilization could mean that "the server responsible for - * handling the request has become less ready for future requests", i.e. its caches become - * stale, and requests become more likely to trigger expensive operations (a more extreme - * case of this example is when a server has just booted, and it is mostly busy with getting - * itself up to speed). - * - * To deal with such scenarios, we add an extra dimension, that of "past underutilization", - * modeled by "storedPermits" variable. This variable is zero when there is no - * underutilization, and it can grow up to maxStoredPermits, for sufficiently large - * underutilization. So, the requested permits, by an invocation acquire(permits), - * are served from: - * - stored permits (if available) - * - fresh permits (for any remaining permits) - * - * How this works is best explained with an example: - * - * For a RateLimiter that produces 1 token per second, every second - * that goes by with the RateLimiter being unused, we increase storedPermits by 1. - * Say we leave the RateLimiter unused for 10 seconds (i.e., we expected a request at time - * X, but we are at time X + 10 seconds before a request actually arrives; this is - * also related to the point made in the last paragraph), thus storedPermits - * becomes 10.0 (assuming maxStoredPermits >= 10.0). At that point, a request of acquire(3) - * arrives. We serve this request out of storedPermits, and reduce that to 7.0 (how this is - * translated to throttling time is discussed later). Immediately after, assume that an - * acquire(10) request arriving. We serve the request partly from storedPermits, - * using all the remaining 7.0 permits, and the remaining 3.0, we serve them by fresh permits - * produced by the rate limiter. - * - * We already know how much time it takes to serve 3 fresh permits: if the rate is - * "1 token per second", then this will take 3 seconds. But what does it mean to serve 7 - * stored permits? As explained above, there is no unique answer. If we are primarily - * interested to deal with underutilization, then we want stored permits to be given out - * /faster/ than fresh ones, because underutilization = free resources for the taking. - * If we are primarily interested to deal with overflow, then stored permits could - * be given out /slower/ than fresh ones. Thus, we require a (different in each case) - * function that translates storedPermits to throtting time. - * - * This role is played by storedPermitsToWaitTime(double storedPermits, double permitsToTake). - * The underlying model is a continuous function mapping storedPermits - * (from 0.0 to maxStoredPermits) onto the 1/rate (i.e. intervals) that is effective at the given - * storedPermits. "storedPermits" essentially measure unused time; we spend unused time - * buying/storing permits. Rate is "permits / time", thus "1 / rate = time / permits". - * Thus, "1/rate" (time / permits) times "permits" gives time, i.e., integrals on this - * function (which is what storedPermitsToWaitTime() computes) correspond to minimum intervals - * between subsequent requests, for the specified number of requested permits. - * - * Here is an example of storedPermitsToWaitTime: - * If storedPermits == 10.0, and we want 3 permits, we take them from storedPermits, - * reducing them to 7.0, and compute the throttling for these as a call to - * storedPermitsToWaitTime(storedPermits = 10.0, permitsToTake = 3.0), which will - * evaluate the integral of the function from 7.0 to 10.0. - * - * Using integrals guarantees that the effect of a single acquire(3) is equivalent - * to { acquire(1); acquire(1); acquire(1); }, or { acquire(2); acquire(1); }, etc, - * since the integral of the function in [7.0, 10.0] is equivalent to the sum of the - * integrals of [7.0, 8.0], [8.0, 9.0], [9.0, 10.0] (and so on), no matter - * what the function is. This guarantees that we handle correctly requests of varying weight - * (permits), /no matter/ what the actual function is - so we can tweak the latter freely. - * (The only requirement, obviously, is that we can compute its integrals). - * - * Note well that if, for this function, we chose a horizontal line, at height of exactly - * (1/QPS), then the effect of the function is non-existent: we serve storedPermits at - * exactly the same cost as fresh ones (1/QPS is the cost for each). We use this trick later. - * - * If we pick a function that goes /below/ that horizontal line, it means that we reduce - * the area of the function, thus time. Thus, the RateLimiter becomes /faster/ after a - * period of underutilization. If, on the other hand, we pick a function that - * goes /above/ that horizontal line, then it means that the area (time) is increased, - * thus storedPermits are more costly than fresh permits, thus the RateLimiter becomes - * /slower/ after a period of underutilization. - * - * Last, but not least: consider a RateLimiter with rate of 1 permit per second, currently - * completely unused, and an expensive acquire(100) request comes. It would be nonsensical - * to just wait for 100 seconds, and /then/ start the actual task. Why wait without doing - * anything? A much better approach is to /allow/ the request right away (as if it was an - * acquire(1) request instead), and postpone /subsequent/ requests as needed. In this version, - * we allow starting the task immediately, and postpone by 100 seconds future requests, - * thus we allow for work to get done in the meantime instead of waiting idly. - * - * This has important consequences: it means that the RateLimiter doesn't remember the time - * of the _last_ request, but it remembers the (expected) time of the _next_ request. This - * also enables us to tell immediately (see tryAcquire(timeout)) whether a particular - * timeout is enough to get us to the point of the next scheduling time, since we always - * maintain that. And what we mean by "an unused RateLimiter" is also defined by that - * notion: when we observe that the "expected arrival time of the next request" is actually - * in the past, then the difference (now - past) is the amount of time that the RateLimiter - * was formally unused, and it is that amount of time which we translate to storedPermits. - * (We increase storedPermits with the amount of permits that would have been produced - * in that idle time). So, if rate == 1 permit per second, and arrivals come exactly - * one second after the previous, then storedPermits is _never_ increased -- we would only - * increase it for arrivals _later_ than the expected one second. - */ - - /** - * Creates a {@code RateLimiter} with the specified stable throughput, given as - * "permits per second" (commonly referred to as <i>QPS</i>, queries per second). - * - * <p>The returned {@code RateLimiter} ensures that on average no more than {@code - * permitsPerSecond} are issued during any given second, with sustained requests - * being smoothly spread over each second. When the incoming request rate exceeds - * {@code permitsPerSecond} the rate limiter will release one permit every {@code - * (1.0 / permitsPerSecond)} seconds. When the rate limiter is unused, - * bursts of up to {@code permitsPerSecond} permits will be allowed, with subsequent - * requests being smoothly limited at the stable rate of {@code permitsPerSecond}. - * - * @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in - * how many permits become available per second. - */ - public static RateLimiter create(double permitsPerSecond) { - return create(SleepingTicker.SYSTEM_TICKER, permitsPerSecond); - } - - @VisibleForTesting - static RateLimiter create(SleepingTicker ticker, double permitsPerSecond) { - RateLimiter rateLimiter = new Bursty(ticker); - rateLimiter.setRate(permitsPerSecond); - return rateLimiter; - } - - /** - * Creates a {@code RateLimiter} with the specified stable throughput, given as - * "permits per second" (commonly referred to as <i>QPS</i>, queries per second), and a - * <i>warmup period</i>, during which the {@code RateLimiter} smoothly ramps up its rate, - * until it reaches its maximum rate at the end of the period (as long as there are enough - * requests to saturate it). Similarly, if the {@code RateLimiter} is left <i>unused</i> for - * a duration of {@code warmupPeriod}, it will gradually return to its "cold" state, - * i.e. it will go through the same warming up process as when it was first created. - * - * <p>The returned {@code RateLimiter} is intended for cases where the resource that actually - * fulfils the requests (e.g., a remote server) needs "warmup" time, rather than - * being immediately accessed at the stable (maximum) rate. - * - * <p>The returned {@code RateLimiter} starts in a "cold" state (i.e. the warmup period - * will follow), and if it is left unused for long enough, it will return to that state. - * - * @param permitsPerSecond the rate of the returned {@code RateLimiter}, measured in - * how many permits become available per second - * @param warmupPeriod the duration of the period where the {@code RateLimiter} ramps up its - * rate, before reaching its stable (maximum) rate - * @param unit the time unit of the warmupPeriod argument - */ - // TODO(user): add a burst size of 1-second-worth of permits, as in the metronome? - public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { - return create(SleepingTicker.SYSTEM_TICKER, permitsPerSecond, warmupPeriod, unit); - } - - @VisibleForTesting - static RateLimiter create( - SleepingTicker ticker, double permitsPerSecond, long warmupPeriod, TimeUnit timeUnit) { - RateLimiter rateLimiter = new WarmingUp(ticker, warmupPeriod, timeUnit); - rateLimiter.setRate(permitsPerSecond); - return rateLimiter; - } - - @VisibleForTesting - static RateLimiter createBursty( - SleepingTicker ticker, double permitsPerSecond, int maxBurstSize) { - Bursty rateLimiter = new Bursty(ticker); - rateLimiter.setRate(permitsPerSecond); - rateLimiter.maxPermits = maxBurstSize; - return rateLimiter; - } - - /** - * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate - * object to facilitate testing. - */ - private final SleepingTicker ticker; - - /** - * The timestamp when the RateLimiter was created; used to avoid possible overflow/time-wrapping - * errors. - */ - private final long offsetNanos; - - /** - * The currently stored permits. - */ - double storedPermits; - - /** - * The maximum number of stored permits. - */ - double maxPermits; - - /** - * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits - * per second has a stable interval of 200ms. - */ - volatile double stableIntervalMicros; - - private final Object mutex = new Object(); - - /** - * The time when the next request (no matter its size) will be granted. After granting a request, - * this is pushed further in the future. Large requests push this further than small requests. - */ - private long nextFreeTicketMicros = 0L; // could be either in the past or future - - private RateLimiter(SleepingTicker ticker) { - this.ticker = ticker; - this.offsetNanos = ticker.read(); - } - - /** - * Updates the stable rate of this {@code RateLimiter}, that is, the - * {@code permitsPerSecond} argument provided in the factory method that - * constructed the {@code RateLimiter}. Currently throttled threads will <b>not</b> - * be awakened as a result of this invocation, thus they do not observe the new rate; - * only subsequent requests will. - * - * <p>Note though that, since each request repays (by waiting, if necessary) the cost - * of the <i>previous</i> request, this means that the very next request - * after an invocation to {@code setRate} will not be affected by the new rate; - * it will pay the cost of the previous request, which is in terms of the previous rate. - * - * <p>The behavior of the {@code RateLimiter} is not modified in any other way, - * e.g. if the {@code RateLimiter} was configured with a warmup period of 20 seconds, - * it still has a warmup period of 20 seconds after this method invocation. - * - * @param permitsPerSecond the new stable rate of this {@code RateLimiter}. - */ - public final void setRate(double permitsPerSecond) { - Preconditions.checkArgument(permitsPerSecond > 0.0 - && !Double.isNaN(permitsPerSecond), "rate must be positive"); - synchronized (mutex) { - resync(readSafeMicros()); - double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond; - this.stableIntervalMicros = stableIntervalMicros; - doSetRate(permitsPerSecond, stableIntervalMicros); - } - } - - abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros); - - /** - * Returns the stable rate (as {@code permits per seconds}) with which this - * {@code RateLimiter} is configured with. The initial value of this is the same as - * the {@code permitsPerSecond} argument passed in the factory method that produced - * this {@code RateLimiter}, and it is only updated after invocations - * to {@linkplain #setRate}. - */ - public final double getRate() { - return TimeUnit.SECONDS.toMicros(1L) / stableIntervalMicros; - } - - /** - * Acquires a permit from this {@code RateLimiter}, blocking until the request can be granted. - * - * <p>This method is equivalent to {@code acquire(1)}. - */ - public void acquire() { - acquire(1); - } - - /** - * Acquires the given number of permits from this {@code RateLimiter}, blocking until the - * request be granted. - * - * @param permits the number of permits to acquire - */ - public void acquire(int permits) { - checkPermits(permits); - long microsToWait; - synchronized (mutex) { - microsToWait = reserveNextTicket(permits, readSafeMicros()); - } - ticker.sleepMicrosUninterruptibly(microsToWait); - } - - /** - * Acquires a permit from this {@code RateLimiter} if it can be obtained - * without exceeding the specified {@code timeout}, or returns {@code false} - * immediately (without waiting) if the permit would not have been granted - * before the timeout expired. - * - * <p>This method is equivalent to {@code tryAcquire(1, timeout, unit)}. - * - * @param timeout the maximum time to wait for the permit - * @param unit the time unit of the timeout argument - * @return {@code true} if the permit was acquired, {@code false} otherwise - */ - public boolean tryAcquire(long timeout, TimeUnit unit) { - return tryAcquire(1, timeout, unit); - } - - /** - * Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay. - * - * <p> - * This method is equivalent to {@code tryAcquire(permits, 0, anyUnit)}. - * - * @param permits the number of permits to acquire - * @return {@code true} if the permits were acquired, {@code false} otherwise - * @since 14.0 - */ - public boolean tryAcquire(int permits) { - return tryAcquire(permits, 0, TimeUnit.MICROSECONDS); - } - - /** - * Acquires a permit from this {@link RateLimiter} if it can be acquired immediately without - * delay. - * - * <p> - * This method is equivalent to {@code tryAcquire(1)}. - * - * @return {@code true} if the permit was acquired, {@code false} otherwise - * @since 14.0 - */ - public boolean tryAcquire() { - return tryAcquire(1, 0, TimeUnit.MICROSECONDS); - } - - /** - * Acquires the given number of permits from this {@code RateLimiter} if it can be obtained - * without exceeding the specified {@code timeout}, or returns {@code false} - * immediately (without waiting) if the permits would not have been granted - * before the timeout expired. - * - * @param permits the number of permits to acquire - * @param timeout the maximum time to wait for the permits - * @param unit the time unit of the timeout argument - * @return {@code true} if the permits were acquired, {@code false} otherwise - */ - public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { - long timeoutMicros = unit.toMicros(timeout); - checkPermits(permits); - long microsToWait; - synchronized (mutex) { - long nowMicros = readSafeMicros(); - if (nextFreeTicketMicros > nowMicros + timeoutMicros) { - return false; - } else { - microsToWait = reserveNextTicket(permits, nowMicros); - } - } - ticker.sleepMicrosUninterruptibly(microsToWait); - return true; - } - - private static void checkPermits(int permits) { - Preconditions.checkArgument(permits > 0, "Requested permits must be positive"); - } - - /** - * Reserves next ticket and returns the wait time that the caller must wait for. - */ - private long reserveNextTicket(double requiredPermits, long nowMicros) { - resync(nowMicros); - long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros; - double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); - double freshPermits = requiredPermits - storedPermitsToSpend; - - long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) - + (long) (freshPermits * stableIntervalMicros); - - this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; - this.storedPermits -= storedPermitsToSpend; - return microsToNextFreeTicket; - } - - /** - * Translates a specified portion of our currently stored permits which we want to - * spend/acquire, into a throttling time. Conceptually, this evaluates the integral - * of the underlying function we use, for the range of - * [(storedPermits - permitsToTake), storedPermits]. - * - * This always holds: {@code 0 <= permitsToTake <= storedPermits} - */ - abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); - - private void resync(long nowMicros) { - // if nextFreeTicket is in the past, resync to now - if (nowMicros > nextFreeTicketMicros) { - storedPermits = Math.min(maxPermits, - storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros); - nextFreeTicketMicros = nowMicros; - } - } - - private long readSafeMicros() { - return TimeUnit.NANOSECONDS.toMicros(ticker.read() - offsetNanos); - } - - @Override - public String toString() { - return String.format("RateLimiter[stableRate=%3.1fqps]", 1000000.0 / stableIntervalMicros); - } - - /** - * This implements the following function: - * - * ^ throttling - * | - * 3*stable + / - * interval | /. - * (cold) | / . - * | / . <-- "warmup period" is the area of the trapezoid between - * 2*stable + / . halfPermits and maxPermits - * interval | / . - * | / . - * | / . - * stable +----------/ WARM . } - * interval | . UP . } <-- this rectangle (from 0 to maxPermits, and - * | . PERIOD. } height == stableInterval) defines the cooldown period, - * | . . } and we want cooldownPeriod == warmupPeriod - * |---------------------------------> storedPermits - * (halfPermits) (maxPermits) - * - * Before going into the details of this particular function, let's keep in mind the basics: - * 1) The state of the RateLimiter (storedPermits) is a vertical line in this figure. - * 2) When the RateLimiter is not used, this goes right (up to maxPermits) - * 3) When the RateLimiter is used, this goes left (down to zero), since if we have storedPermits, - * we serve from those first - * 4) When _unused_, we go right at the same speed (rate)! I.e., if our rate is - * 2 permits per second, and 3 unused seconds pass, we will always save 6 permits - * (no matter what our initial position was), up to maxPermits. - * If we invert the rate, we get the "stableInterval" (interval between two requests - * in a perfectly spaced out sequence of requests of the given rate). Thus, if you - * want to see "how much time it will take to go from X storedPermits to X+K storedPermits?", - * the answer is always stableInterval * K. In the same example, for 2 permits per second, - * stableInterval is 500ms. Thus to go from X storedPermits to X+6 storedPermits, we - * require 6 * 500ms = 3 seconds. - * - * In short, the time it takes to move to the right (save K permits) is equal to the - * rectangle of width == K and height == stableInterval. - * 4) When _used_, the time it takes, as explained in the introductory class note, is - * equal to the integral of our function, between X permits and X-K permits, assuming - * we want to spend K saved permits. - * - * In summary, the time it takes to move to the left (spend K permits), is equal to the - * area of the function of width == K. - * - * Let's dive into this function now: - * - * When we have storedPermits <= halfPermits (the left portion of the function), then - * we spend them at the exact same rate that - * fresh permits would be generated anyway (that rate is 1/stableInterval). We size - * this area to be equal to _half_ the specified warmup period. Why we need this? - * And why half? We'll explain shortly below (after explaining the second part). - * - * Stored permits that are beyond halfPermits, are mapped to an ascending line, that goes - * from stableInterval to 3 * stableInterval. The average height for that part is - * 2 * stableInterval, and is sized appropriately to have an area _equal_ to the - * specified warmup period. Thus, by point (4) above, it takes "warmupPeriod" amount of time - * to go from maxPermits to halfPermits. - * - * BUT, by point (3) above, it only takes "warmupPeriod / 2" amount of time to return back - * to maxPermits, from halfPermits! (Because the trapezoid has double the area of the rectangle - * of height stableInterval and equivalent width). We decided that the "cooldown period" - * time should be equivalent to "warmup period", thus a fully saturated RateLimiter - * (with zero stored permits, serving only fresh ones) can go to a fully unsaturated - * (with storedPermits == maxPermits) in the same amount of time it takes for a fully - * unsaturated RateLimiter to return to the stableInterval -- which happens in halfPermits, - * since beyond that point, we use a horizontal line of "stableInterval" height, simulating - * the regular rate. - * - * Thus, we have figured all dimensions of this shape, to give all the desired - * properties: - * - the width is warmupPeriod / stableInterval, to make cooldownPeriod == warmupPeriod - * - the slope starts at the middle, and goes from stableInterval to 3*stableInterval so - * to have halfPermits being spend in double the usual time (half the rate), while their - * respective rate is steadily ramping up - */ - private static class WarmingUp extends RateLimiter { - - final long warmupPeriodMicros; - /** - * The slope of the line from the stable interval (when permits == 0), to the cold interval - * (when permits == maxPermits) - */ - private double slope; - private double halfPermits; - - WarmingUp(SleepingTicker ticker, long warmupPeriod, TimeUnit timeUnit) { - super(ticker); - this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); - } - - @Override - void doSetRate(double permitsPerSecond, double stableIntervalMicros) { - double oldMaxPermits = maxPermits; - maxPermits = warmupPeriodMicros / stableIntervalMicros; - halfPermits = maxPermits / 2.0; - // Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate - double coldIntervalMicros = stableIntervalMicros * 3.0; - slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits; - if (oldMaxPermits == Double.POSITIVE_INFINITY) { - // if we don't special-case this, we would get storedPermits == NaN, below - storedPermits = 0.0; - } else { - storedPermits = (oldMaxPermits == 0.0) - ? maxPermits // initial state is cold - : storedPermits * maxPermits / oldMaxPermits; - } - } - - @Override - long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { - double availablePermitsAboveHalf = storedPermits - halfPermits; - long micros = 0; - // measuring the integral on the right part of the function (the climbing line) - if (availablePermitsAboveHalf > 0.0) { - double permitsAboveHalfToTake = Math.min(availablePermitsAboveHalf, permitsToTake); - micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf) - + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0); - permitsToTake -= permitsAboveHalfToTake; - } - // measuring the integral on the left part of the function (the horizontal line) - micros += (stableIntervalMicros * permitsToTake); - return micros; - } - - private double permitsToTime(double permits) { - return stableIntervalMicros + permits * slope; - } - } - - /** - * This implements a trivial function, where storedPermits are translated to - * zero throttling - thus, a client gets an infinite speedup for permits acquired out - * of the storedPermits pool. This is also used for the special case of the "metronome", - * where the width of the function is also zero; maxStoredPermits is zero, thus - * storedPermits and permitsToTake are always zero as well. Such a RateLimiter can - * not save permits when unused, thus all permits it serves are fresh, using the - * designated rate. - */ - private static class Bursty extends RateLimiter { - Bursty(SleepingTicker ticker) { - super(ticker); - } - - @Override - void doSetRate(double permitsPerSecond, double stableIntervalMicros) { - double oldMaxPermits = this.maxPermits; - /* - * We allow the equivalent work of up to one second to be granted with zero waiting, if the - * rate limiter has been unused for as much. This is to avoid potentially producing tiny - * wait interval between subsequent requests for sufficiently large rates, which would - * unnecessarily overconstrain the thread scheduler. - */ - maxPermits = permitsPerSecond; // one second worth of permits - storedPermits = (oldMaxPermits == 0.0) - ? 0.0 // initial state - : storedPermits * maxPermits / oldMaxPermits; - } - - @Override - long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { - return 0L; - } - } - - @VisibleForTesting - static abstract class SleepingTicker extends Ticker { - abstract void sleepMicrosUninterruptibly(long micros); - - static final SleepingTicker SYSTEM_TICKER = new SleepingTicker() { - @Override - public long read() { - return systemTicker().read(); - } - - @Override - public void sleepMicrosUninterruptibly(long micros) { - if (micros > 0) { - Uninterruptibles.sleepUninterruptibly(micros, TimeUnit.MICROSECONDS); - } - } - }; - } -} diff --git a/guava/src/com/google/common/util/concurrent/Service.java b/guava/src/com/google/common/util/concurrent/Service.java index 861164e..9ad1f3d 100644 --- a/guava/src/com/google/common/util/concurrent/Service.java +++ b/guava/src/com/google/common/util/concurrent/Service.java @@ -19,58 +19,57 @@ package com.google.common.util.concurrent; import com.google.common.annotations.Beta; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; /** - * An object with an operational state, plus asynchronous {@link #start()} and {@link #stop()} - * lifecycle methods to transition between states. Example services include webservers, RPC servers - * and timers. - * - * <p>The normal lifecycle of a service is: + * An object with an operational state, plus asynchronous {@link #start()} and + * {@link #stop()} lifecycle methods to transfer into and out of this state. + * Example services include webservers, RPC servers and timers. The normal + * lifecycle of a service is: * <ul> - * <li>{@linkplain State#NEW NEW} -> - * <li>{@linkplain State#STARTING STARTING} -> - * <li>{@linkplain State#RUNNING RUNNING} -> - * <li>{@linkplain State#STOPPING STOPPING} -> - * <li>{@linkplain State#TERMINATED TERMINATED} + * <li>{@link State#NEW} -></li> + * <li>{@link State#STARTING} -></li> + * <li>{@link State#RUNNING} -></li> + * <li>{@link State#STOPPING} -></li> + * <li>{@link State#TERMINATED}</li> * </ul> * - * <p>There are deviations from this if there are failures or if {@link Service#stop} is called - * before the {@link Service} reaches the {@linkplain State#RUNNING RUNNING} state. The set of legal - * transitions form a <a href="http://en.wikipedia.org/wiki/Directed_acyclic_graph">DAG</a>, - * therefore every method of the listener will be called at most once. N.B. The {@link State#FAILED} - * and {@link State#TERMINATED} states are terminal states, once a service enters either of these - * states it cannot ever leave them. + * If the service fails while starting, running or stopping, its state will be + * {@link State#FAILED}, and its behavior is undefined. Such a service cannot be + * started nor stopped. * - * <p>Implementors of this interface are strongly encouraged to extend one of the abstract classes - * in this package which implement this interface and make the threading and state management - * easier. + * <p>Implementors of this interface are strongly encouraged to extend one of + * the abstract classes in this package which implement this interface and + * make the threading and state management easier. * * @author Jesse Wilson - * @author Luke Sandberg - * @since 9.0 (in 1.0 as {@code com.google.common.base.Service}) + * @since 9.0 (in 1.0 as + * {@code com.google.common.base.Service}) */ -@Beta +@Beta // TODO(kevinb): make abstract class? public interface Service { /** - * If the service state is {@link State#NEW}, this initiates service startup and returns - * immediately. If the service has already been started, this method returns immediately without - * taking action. A stopped service may not be restarted. + * If the service state is {@link State#NEW}, this initiates service startup + * and returns immediately. If the service has already been started, this + * method returns immediately without taking action. A stopped service may not + * be restarted. * - * @return a future for the startup result, regardless of whether this call initiated startup. - * Calling {@link ListenableFuture#get} will block until the service has finished - * starting, and returns one of {@link State#RUNNING}, {@link State#STOPPING} or - * {@link State#TERMINATED}. If the service fails to start, {@link ListenableFuture#get} - * will throw an {@link ExecutionException}, and the service's state will be - * {@link State#FAILED}. If it has already finished starting, {@link ListenableFuture#get} - * returns immediately. Cancelling this future has no effect on the service. + * @return a future for the startup result, regardless of whether this call + * initiated startup. Calling {@link ListenableFuture#get} will block + * until the service has finished starting, and returns one of {@link + * State#RUNNING}, {@link State#STOPPING} or {@link State#TERMINATED}. If + * the service fails to start, {@link ListenableFuture#get} will throw an + * {@link ExecutionException}, and the service's state will be {@link + * State#FAILED}. If it has already finished starting, {@link + * ListenableFuture#get} returns immediately. Cancelling this future has + * no effect on the service. */ ListenableFuture<State> start(); /** - * Initiates service startup (if necessary), returning once the service has finished starting. - * Unlike calling {@code start().get()}, this method throws no checked exceptions, and it cannot - * be {@linkplain Thread#interrupt interrupted}. + * Initiates service startup (if necessary), returning once the service has + * finished starting. Unlike calling {@code start().get()}, this method throws + * no checked exceptions, and it cannot be {@linkplain Thread#interrupt + * interrupted}. * * @throws UncheckedExecutionException if startup failed * @return the state of the service when startup finished. @@ -88,67 +87,39 @@ public interface Service { State state(); /** - * If the service is {@linkplain State#STARTING starting} or {@linkplain State#RUNNING running}, - * this initiates service shutdown and returns immediately. If the service is - * {@linkplain State#NEW new}, it is {@linkplain State#TERMINATED terminated} without having been - * started nor stopped. If the service has already been stopped, this method returns immediately - * without taking action. + * If the service is {@linkplain State#STARTING starting} or {@linkplain + * State#RUNNING running}, this initiates service shutdown and returns + * immediately. If the service is {@linkplain State#NEW new}, it is + * {@linkplain State#TERMINATED terminated} without having been started nor + * stopped. If the service has already been stopped, this method returns + * immediately without taking action. * - * @return a future for the shutdown result, regardless of whether this call initiated shutdown. - * Calling {@link ListenableFuture#get} will block until the service has finished shutting - * down, and either returns {@link State#TERMINATED} or throws an - * {@link ExecutionException}. If it has already finished stopping, - * {@link ListenableFuture#get} returns immediately. Cancelling this future has no effect - * on the service. + * @return a future for the shutdown result, regardless of whether this call + * initiated shutdown. Calling {@link ListenableFuture#get} will block + * until the service has finished shutting down, and either returns + * {@link State#TERMINATED} or throws an {@link ExecutionException}. If + * it has already finished stopping, {@link ListenableFuture#get} returns + * immediately. Cancelling this future has no effect on the service. */ ListenableFuture<State> stop(); /** - * Initiates service shutdown (if necessary), returning once the service has finished stopping. If - * this is {@link State#STARTING}, startup will be cancelled. If this is {@link State#NEW}, it is - * {@link State#TERMINATED terminated} without having been started nor stopped. Unlike calling - * {@code stop().get()}, this method throws no checked exceptions. + * Initiates service shutdown (if necessary), returning once the service has + * finished stopping. If this is {@link State#STARTING}, startup will be + * cancelled. If this is {@link State#NEW}, it is {@link State#TERMINATED + * terminated} without having been started nor stopped. Unlike calling {@code + * stop().get()}, this method throws no checked exceptions. * - * @throws UncheckedExecutionException if the service has failed or fails during shutdown + * @throws UncheckedExecutionException if shutdown failed * @return the state of the service when shutdown finished. */ State stopAndWait(); /** - * Returns the {@link Throwable} that caused this service to fail. - * - * @throws IllegalStateException if this service's state isn't {@linkplain State#FAILED FAILED}. - * - * @since 14.0 - */ - Throwable failureCause(); - - /** - * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given - * executor. The listener will have the corresponding transition method called whenever the - * service changes state. The listener will not have previous state changes replayed, so it is - * suggested that listeners are added before the service starts. - * - * <p>There is no guaranteed ordering of execution of listeners, but any listener added through - * this method is guaranteed to be called whenever there is a state change. - * - * <p>Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown - * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an exception - * thrown by {@linkplain MoreExecutors#sameThreadExecutor inline execution}) will be caught and - * logged. - * - * @param listener the listener to run when the service changes state is complete - * @param executor the executor in which the the listeners callback methods will be run. For fast, - * lightweight listeners that would be safe to execute in any thread, consider - * {@link MoreExecutors#sameThreadExecutor}. - * @since 13.0 - */ - void addListener(Listener listener, Executor executor); - - /** * The lifecycle states of a service. * - * @since 9.0 (in 1.0 as {@code com.google.common.base.Service.State}) + * @since 9.0 (in 1.0 as + * {@code com.google.common.base.Service.State}) */ @Beta // should come out of Beta when Service does enum State { @@ -174,70 +145,15 @@ public interface Service { STOPPING, /** - * A service in this state has completed execution normally. It does minimal work and consumes - * minimal resources. + * A service in this state has completed execution normally. It does minimal + * work and consumes minimal resources. */ TERMINATED, /** - * A service in this state has encountered a problem and may not be operational. It cannot be - * started nor stopped. + * A service in this state has encountered a problem and may not be + * operational. It cannot be started nor stopped. */ FAILED } - - /** - * A listener for the various state changes that a {@link Service} goes through in its lifecycle. - * - * @author Luke Sandberg - * @since 13.0 - */ - @Beta // should come out of Beta when Service does - interface Listener { - /** - * Called when the service transitions from {@linkplain State#NEW NEW} to - * {@linkplain State#STARTING STARTING}. This occurs when {@link Service#start} or - * {@link Service#startAndWait} is called the first time. - */ - void starting(); - - /** - * Called when the service transitions from {@linkplain State#STARTING STARTING} to - * {@linkplain State#RUNNING RUNNING}. This occurs when a service has successfully started. - */ - void running(); - - /** - * Called when the service transitions to the {@linkplain State#STOPPING STOPPING} state. The - * only valid values for {@code from} are {@linkplain State#STARTING STARTING} or - * {@linkplain State#RUNNING RUNNING}. This occurs when {@link Service#stop} is called. - * - * @param from The previous state that is being transitioned from. - */ - void stopping(State from); - - /** - * Called when the service transitions to the {@linkplain State#TERMINATED TERMINATED} state. - * The {@linkplain State#TERMINATED TERMINATED} state is a terminal state in the transition - * diagram. Therefore, if this method is called, no other methods will be called on the - * {@link Listener}. - * - * @param from The previous state that is being transitioned from. The only valid values for - * this are {@linkplain State#NEW NEW}, {@linkplain State#RUNNING RUNNING} or - * {@linkplain State#STOPPING STOPPING}. - */ - void terminated(State from); - - /** - * Called when the service transitions to the {@linkplain State#FAILED FAILED} state. The - * {@linkplain State#FAILED FAILED} state is a terminal state in the transition diagram. - * Therefore, if this method is called, no other methods will be called on the {@link Listener}. - * - * @param from The previous state that is being transitioned from. Failure can occur in any - * state with the exception of {@linkplain State#NEW NEW} or - * {@linkplain State#TERMINATED TERMINATED}. - * @param failure The exception that caused the failure. - */ - void failed(State from, Throwable failure); - } } diff --git a/guava/src/com/google/common/util/concurrent/ServiceManager.java b/guava/src/com/google/common/util/concurrent/ServiceManager.java deleted file mode 100644 index c779b23..0000000 --- a/guava/src/com/google/common/util/concurrent/ServiceManager.java +++ /dev/null @@ -1,724 +0,0 @@ -/* - * Copyright (C) 2012 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.common.util.concurrent; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; -import com.google.common.collect.Queues; -import com.google.common.util.concurrent.Service.State; - -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.Immutable; -import javax.inject.Inject; -import javax.inject.Singleton; - -/** - * A manager for monitoring and controlling a set of {@link Service services}. This class provides - * methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and - * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}. - * Additionally, users can monitor state transitions with the {@link Listener listener} mechanism. - * - * <p>While it is recommended that service lifecycles be managed via this class, state transitions - * initiated via other mechanisms do not impact the correctness of its methods. For example, if the - * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked - * when appropriate and {@link #awaitHealthy} will still work as expected. - * - * <p>Here is a simple example of how to use a {@link ServiceManager} to start a server. - * <pre> {@code - * class Server { - * public static void main(String[] args) { - * Set<Service> services = ...; - * ServiceManager manager = new ServiceManager(services); - * manager.addListener(new Listener() { - * public void stopped() {} - * public void healthy() { - * // Services have been initialized and are healthy, start accepting requests... - * } - * public void failure(Service service) { - * // Something failed, at this point we could log it, notify a load balancer, or take - * // some other action. For now we will just exit. - * System.exit(1); - * } - * }, - * MoreExecutors.sameThreadExecutor()); - * - * Runtime.getRuntime().addShutdownHook(new Thread() { - * public void run() { - * // Give the services 5 seconds to stop to ensure that we are responsive to shutdown - * // requests. - * try { - * manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS); - * } catch (TimeoutException timeout) { - * // stopping timed out - * } - * } - * }); - * manager.startAsync(); // start all the services asynchronously - * } - * }}</pre> - * - * This class uses the ServiceManager's methods to start all of its services, to respond to service - * failure and to ensure that when the JVM is shutting down all the services are stopped. - * - * @author Luke Sandberg - * @since 14.0 - */ -@Beta -@Singleton -public final class ServiceManager { - private static final Logger logger = Logger.getLogger(ServiceManager.class.getName()); - - /** - * A listener for the aggregate state changes of the services that are under management. Users - * that need to listen to more fine-grained events (such as when each particular - * {@link Service service} starts, or terminates), should attach {@link Service.Listener service - * listeners} to each individual service. - * - * @author Luke Sandberg - * @since 14.0 - */ - @Beta // Should come out of Beta when ServiceManager does - public static interface Listener { - /** - * Called when the service initially becomes healthy. - * - * <p>This will be called at most once after all the services have entered the - * {@linkplain State#RUNNING running} state. If any services fail during start up or - * {@linkplain State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other - * services have started {@linkplain State#RUNNING running} then this method will not be called. - */ - void healthy(); - - /** - * Called when the all of the component services have reached a terminal state, either - * {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}. - */ - void stopped(); - - /** - * Called when a component service has {@linkplain State#FAILED failed}. - * - * @param service The service that failed. - */ - void failure(Service service); - } - - /** - * An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener - * service listeners}. This is extracted into its own object so that {@link ServiceListener} - * could be made {@code static} and its instances can be safely constructed and added in the - * {@link ServiceManager} constructor without having to close over the partially constructed - * {@link ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}). - */ - private final ServiceManagerState state; - private final ImmutableMap<Service, ServiceListener> services; - - /** - * Constructs a new instance for managing the given services. - * - * @param services The services to manage - * - * @throws IllegalArgumentException if not all services are {@link State#NEW new} or if there are - * any duplicate services. - */ - public ServiceManager(Iterable<? extends Service> services) { - ImmutableList<Service> copy = ImmutableList.copyOf(services); - this.state = new ServiceManagerState(copy.size()); - ImmutableMap.Builder<Service, ServiceListener> builder = ImmutableMap.builder(); - Executor executor = MoreExecutors.sameThreadExecutor(); - for (Service service : copy) { - ServiceListener listener = new ServiceListener(service, state); - service.addListener(listener, executor); - // We check the state after adding the listener as a way to ensure that our listener was added - // to a NEW service. - checkArgument(service.state() == State.NEW, "Can only manage NEW services, %s", service); - builder.put(service, listener); - } - this.services = builder.build(); - } - - /** - * Constructs a new instance for managing the given services. This constructor is provided so that - * dependency injection frameworks can inject instances of {@link ServiceManager}. - * - * @param services The services to manage - * - * @throws IllegalStateException if not all services are {@link State#NEW new}. - */ - @Inject ServiceManager(Set<Service> services) { - this((Iterable<Service>) services); - } - - /** - * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given - * executor. The listener will not have previous state changes replayed, so it is - * suggested that listeners are added before any of the managed services are - * {@linkplain Service#start started}. - * - * <p>There is no guaranteed ordering of execution of listeners, but any listener added through - * this method is guaranteed to be called whenever there is a state change. - * - * <p>Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown - * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an exception - * thrown by {@linkplain MoreExecutors#sameThreadExecutor inline execution}) will be caught and - * logged. - * - * @param listener the listener to run when the manager changes state - * @param executor the executor in which the the listeners callback methods will be run. For fast, - * lightweight listeners that would be safe to execute in any thread, consider - * {@link MoreExecutors#sameThreadExecutor}. - */ - public void addListener(Listener listener, Executor executor) { - state.addListener(listener, executor); - } - - /** - * Initiates service {@linkplain Service#start startup} on all the services being managed. It is - * only valid to call this method if all of the services are {@linkplain State#NEW new}. - * - * @return this - * @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the - * method is called, {@link State#TERMINATED terminated} or {@link State#FAILED failed}. - */ - public ServiceManager startAsync() { - for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) { - Service service = entry.getKey(); - State state = service.state(); - checkState(state == State.NEW, "Service %s is %s, cannot start it.", service, - state); - } - for (ServiceListener service : services.values()) { - service.start(); - } - return this; - } - - /** - * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}. The manager - * will become healthy after all the component services have reached the {@linkplain State#RUNNING - * running} state. - * - * @throws IllegalStateException if the service manager reaches a state from which it cannot - * become {@linkplain #isHealthy() healthy}. - */ - public void awaitHealthy() { - state.awaitHealthy(); - checkState(isHealthy(), "Expected to be healthy after starting"); - } - - /** - * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more - * than the given time. The manager will become healthy after all the component services have - * reached the {@linkplain State#RUNNING running} state. - * - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @throws TimeoutException if not all of the services have finished starting within the deadline - * @throws IllegalStateException if the service manager reaches a state from which it cannot - * become {@linkplain #isHealthy() healthy}. - */ - public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException { - if (!state.awaitHealthy(timeout, unit)) { - // It would be nice to tell the caller who we are still waiting on, and this information is - // likely to be in servicesByState(), however due to race conditions we can't actually tell - // which services are holding up healthiness. The current set of NEW or STARTING services is - // likely to point out the culprit, but may not. If we really wanted to solve this we could - // change state to track exactly which services have started and then we could accurately - // report on this. But it is only for logging so we likely don't care. - throw new TimeoutException("Timeout waiting for the services to become healthy."); - } - checkState(isHealthy(), "Expected to be healthy after starting"); - } - - /** - * Initiates service {@linkplain Service#stop shutdown} if necessary on all the services being - * managed. - * - * @return this - */ - public ServiceManager stopAsync() { - for (Service service : services.keySet()) { - service.stop(); - } - return this; - } - - /** - * Waits for the all the services to reach a terminal state. After this method returns all - * services will either be {@link Service.State#TERMINATED terminated} or - * {@link Service.State#FAILED failed} - */ - public void awaitStopped() { - state.awaitStopped(); - } - - /** - * Waits for the all the services to reach a terminal state for no more than the given time. After - * this method returns all services will either be {@link Service.State#TERMINATED terminated} or - * {@link Service.State#FAILED failed} - * - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @throws TimeoutException if not all of the services have stopped within the deadline - */ - public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException { - if (!state.awaitStopped(timeout, unit)) { - throw new TimeoutException("Timeout waiting for the services to stop."); - } - } - - /** - * Returns true if all services are currently in the {@linkplain State#RUNNING running} state. - * - * <p>Users who want more detailed information should use the {@link #servicesByState} method to - * get detailed information about which services are not running. - */ - public boolean isHealthy() { - for (Service service : services.keySet()) { - if (!service.isRunning()) { - return false; - } - } - return true; - } - - /** - * Provides a snapshot of the current state of all the services under management. - * - * <p>N.B. This snapshot it not guaranteed to be consistent, i.e. the set of states returned may - * not correspond to any particular point in time view of the services. - */ - public ImmutableMultimap<State, Service> servicesByState() { - ImmutableMultimap.Builder<State, Service> builder = ImmutableMultimap.builder(); - for (Service service : services.keySet()) { - builder.put(service.state(), service); - } - return builder.build(); - } - - /** - * Returns the service load times. This value will only return startup times for services that - * have finished starting. - * - * @return Map of services and their corresponding startup time in millis, the map entries will be - * ordered by startup time. - */ - public ImmutableMap<Service, Long> startupTimes() { - Map<Service, Long> loadTimeMap = Maps.newHashMapWithExpectedSize(services.size()); - for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) { - State state = entry.getKey().state(); - if (state != State.NEW && state != State.STARTING) { - loadTimeMap.put(entry.getKey(), entry.getValue().startupTimeMillis()); - } - } - List<Entry<Service, Long>> servicesByStartTime = Ordering.<Long>natural() - .onResultOf(new Function<Map.Entry<Service, Long>, Long>() { - @Override public Long apply(Map.Entry<Service, Long> input) { - return input.getValue(); - } - }) - .sortedCopy(loadTimeMap.entrySet()); - ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder(); - for (Map.Entry<Service, Long> entry : servicesByStartTime) { - builder.put(entry); - } - return builder.build(); - } - - @Override public String toString() { - return Objects.toStringHelper(ServiceManager.class) - .add("services", services.keySet()) - .toString(); - } - - /** - * An encapsulation of all the mutable state of the {@link ServiceManager} that needs to be - * accessed by instances of {@link ServiceListener}. - */ - private static final class ServiceManagerState { - final Monitor monitor = new Monitor(); - final int numberOfServices; - /** The number of services that have not finished starting up. */ - @GuardedBy("monitor") - int unstartedServices; - /** The number of services that have not reached a terminal state. */ - @GuardedBy("monitor") - int unstoppedServices; - /** - * Controls how long to wait for all the service manager to either become healthy or reach a - * state where it is guaranteed that it can never become healthy. - */ - final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) { - @Override public boolean isSatisfied() { - // All services have started or some service has terminated/failed. - return unstartedServices == 0 || unstoppedServices != numberOfServices; - } - }; - /** - * Controls how long to wait for all services to reach a terminal state. - */ - final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) { - @Override public boolean isSatisfied() { - return unstoppedServices == 0; - } - }; - /** The listeners to notify during a state transition. */ - @GuardedBy("monitor") - final List<ListenerExecutorPair> listeners = Lists.newArrayList(); - /** - * The queue of listeners that are waiting to be executed. - * - * <p>Enqueue operations should be protected by {@link #monitor} while dequeue operations should - * be protected by the implicit lock on this object. This is to ensure that listeners are - * executed in the correct order and also so that a listener can not hold the {@link #monitor} - * for an arbitrary amount of time (listeners can only block other listeners, not internal state - * transitions). We use a concurrent queue implementation so that enqueues can be executed - * concurrently with dequeues. - */ - @GuardedBy("queuedListeners") - final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue(); - - ServiceManagerState(int numberOfServices) { - this.numberOfServices = numberOfServices; - this.unstoppedServices = numberOfServices; - this.unstartedServices = numberOfServices; - } - - void addListener(Listener listener, Executor executor) { - checkNotNull(listener, "listener"); - checkNotNull(executor, "executor"); - monitor.enter(); - try { - // no point in adding a listener that will never be called - if (unstartedServices > 0 || unstoppedServices > 0) { - listeners.add(new ListenerExecutorPair(listener, executor)); - } - } finally { - monitor.leave(); - } - } - - void awaitHealthy() { - monitor.enter(); - try { - monitor.waitForUninterruptibly(awaitHealthGuard); - } finally { - monitor.leave(); - } - } - - boolean awaitHealthy(long timeout, TimeUnit unit) { - monitor.enter(); - try { - if (monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) { - return true; - } - return false; - } finally { - monitor.leave(); - } - } - - void awaitStopped() { - monitor.enter(); - try { - monitor.waitForUninterruptibly(stoppedGuard); - } finally { - monitor.leave(); - } - } - - boolean awaitStopped(long timeout, TimeUnit unit) { - monitor.enter(); - try { - return monitor.waitForUninterruptibly(stoppedGuard, timeout, unit); - } finally { - monitor.leave(); - } - } - - /** - * This should be called when a service finishes starting up. - * - * @param currentlyHealthy whether or not the service that finished starting was healthy at the - * time that it finished starting. - */ - @GuardedBy("monitor") - private void serviceFinishedStarting(Service service, boolean currentlyHealthy) { - checkState(unstartedServices > 0, - "All services should have already finished starting but %s just finished.", service); - unstartedServices--; - if (currentlyHealthy && unstartedServices == 0 && unstoppedServices == numberOfServices) { - // This means that the manager is currently healthy, or at least it should have been - // healthy at some point from some perspective. Calling isHealthy is not currently - // guaranteed to return true because any service could fail right now. However, the - // happens-before relationship enforced by the monitor ensures that this method was called - // before either serviceTerminated or serviceFailed, so we know that the manager was at - // least healthy for some period of time. Furthermore we are guaranteed that this call to - // healthy() will be before any call to terminated() or failure(Service) on the listener. - // So it is correct to execute the listener's health() callback. - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.healthy(); - } - }); - } - }); - } - } - } - - /** - * This should be called when a service is {@linkplain State#TERMINATED terminated}. - */ - @GuardedBy("monitor") - private void serviceTerminated(Service service) { - serviceStopped(service); - } - - /** - * This should be called when a service is {@linkplain State#FAILED failed}. - */ - @GuardedBy("monitor") - private void serviceFailed(final Service service) { - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.failure(service); - } - }); - } - }); - } - serviceStopped(service); - } - - /** - * Should be called whenever a service reaches a terminal state ( - * {@linkplain State#TERMINATED terminated} or - * {@linkplain State#FAILED failed}). - */ - @GuardedBy("monitor") - private void serviceStopped(Service service) { - checkState(unstoppedServices > 0, - "All services should have already stopped but %s just stopped.", service); - unstoppedServices--; - if (unstoppedServices == 0) { - checkState(unstartedServices == 0, - "All services are stopped but %d services haven't finished starting", - unstartedServices); - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.stopped(); - } - }); - } - }); - } - // no more listeners could possibly be called, so clear them out - listeners.clear(); - } - } - - /** - * Attempts to execute all the listeners in {@link #queuedListeners}. - */ - private void executeListeners() { - checkState(!monitor.isOccupiedByCurrentThread(), - "It is incorrect to execute listeners with the monitor held."); - synchronized (queuedListeners) { - Runnable listener; - while ((listener = queuedListeners.poll()) != null) { - listener.run(); - } - } - } - } - - /** - * A {@link Service} that wraps another service and times how long it takes for it to start and - * also calls the {@link ServiceManagerState#serviceFinishedStarting}, - * {@link ServiceManagerState#serviceTerminated} and {@link ServiceManagerState#serviceFailed} - * according to its current state. - */ - private static final class ServiceListener implements Service.Listener { - @GuardedBy("watch") // AFAICT Stopwatch is not thread safe so we need to protect accesses - final Stopwatch watch = new Stopwatch(); - final Service service; - final ServiceManagerState state; - - /** - * @param service the service that - */ - ServiceListener(Service service, ServiceManagerState state) { - this.service = service; - this.state = state; - } - - @Override public void starting() { - // This can happen if someone besides the ServiceManager starts the service, in this case - // our timings may be inaccurate. - startTimer(); - } - - @Override public void running() { - state.monitor.enter(); - try { - finishedStarting(true); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - - @Override public void stopping(State from) { - if (from == State.STARTING) { - state.monitor.enter(); - try { - finishedStarting(false); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - } - - @Override public void terminated(State from) { - logger.info("Service " + service + " has terminated. Previous state was " + from + " state."); - state.monitor.enter(); - try { - if (from == State.NEW) { - // startTimer is idempotent, so this is safe to call and it may be necessary if no one has - // started the timer yet. - startTimer(); - finishedStarting(false); - } - state.serviceTerminated(service); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - - @Override public void failed(State from, Throwable failure) { - logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.", - failure); - state.monitor.enter(); - try { - if (from == State.STARTING) { - finishedStarting(false); - } - state.serviceFailed(service); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - - /** - * Stop the stopwatch, log the startup time and decrement the startup latch - * - * @param currentlyHealthy whether or not the service that finished starting is currently - * healthy - */ - @GuardedBy("monitor") - void finishedStarting(boolean currentlyHealthy) { - synchronized (watch) { - watch.stop(); - logger.log(Level.INFO, "Started " + service + " in " + startupTimeMillis() + " ms."); - } - state.serviceFinishedStarting(service, currentlyHealthy); - } - - void start() { - startTimer(); - service.start(); - } - - /** Start the timer if it hasn't been started. */ - void startTimer() { - synchronized (watch) { - if (!watch.isRunning()) { // only start the watch once. - watch.start(); - logger.log(Level.INFO, "Starting {0}", service); - } - } - } - - /** Returns the amount of time it took for the service to finish starting in milliseconds. */ - synchronized long startupTimeMillis() { - synchronized (watch) { - return watch.elapsed(MILLISECONDS); - } - } - } - - /** Simple value object binding a listener to its executor. */ - @Immutable private static final class ListenerExecutorPair { - final Listener listener; - final Executor executor; - - ListenerExecutorPair(Listener listener, Executor executor) { - this.listener = listener; - this.executor = executor; - } - - /** - * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all - * exceptions - */ - void execute(Runnable runnable) { - try { - executor.execute(runnable); - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception while executing listener " + listener - + " with executor " + executor, e); - } - } - } -} diff --git a/guava/src/com/google/common/util/concurrent/Striped.java b/guava/src/com/google/common/util/concurrent/Striped.java deleted file mode 100644 index 3c426f0..0000000 --- a/guava/src/com/google/common/util/concurrent/Striped.java +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Copyright (C) 2011 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.common.util.concurrent; - -import com.google.common.annotations.Beta; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Iterables; -import com.google.common.collect.MapMaker; -import com.google.common.math.IntMath; -import com.google.common.primitives.Ints; - -import java.math.RoundingMode; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping - * similar to that of {@code ConcurrentHashMap} in a reusable form, and extends it for - * semaphores and read-write locks. Conceptually, lock striping is the technique of dividing a lock - * into many <i>stripes</i>, increasing the granularity of a single lock and allowing independent - * operations to lock different stripes and proceed concurrently, instead of creating contention - * for a single lock. - * - * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore), - * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} - * (assuming {@link Object#hashCode()} is correctly implemented for the keys). Note - * that if {@code key1} is <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> - * guaranteed that {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless - * be mapped to the same lock. The lower the number of stripes, the higher the probability of this - * happening. - * - * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, - * and {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: - * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} - * {@code Striped<Lock>}, {@linkplain #semaphore(int, int) strong} and {@linkplain - * #lazyWeakSemaphore(int, int) weak} {@code Striped<Semaphore>}, and {@linkplain - * #readWriteLock(int) strong} and {@linkplain #lazyWeakReadWriteLock(int) weak} - * {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all stripes (locks/semaphores) are - * initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable. - * <i>Weak</i> means that locks/semaphores are created lazily, and they are allowed to be reclaimed - * if nobody is holding on to them. This is useful, for example, if one wants to create a {@code - * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these - * would be in use. - * - * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K} - * represents the task. This maximizes concurrency by having each unique key mapped to a unique - * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock - * for all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of - * choosing either of these extremes, {@code Striped} allows the user to trade between required - * concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could easily - * create a very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, - * instead of possibly thousands of locks which could be created in a {@code Map<K, Lock>} - * structure. - * - * @author Dimitris Andreou - * @since 13.0 - */ -@Beta -public abstract class Striped<L> { - private Striped() {} - - /** - * Returns the stripe that corresponds to the passed key. It is always guaranteed that if - * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}. - * - * @param key an arbitrary, non-null key - * @return the stripe that the passed key corresponds to - */ - public abstract L get(Object key); - - /** - * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to - * {@code size()}, exclusively. - * - * @param index the index of the stripe to return; must be in {@code [0...size())} - * @return the stripe at the specified index - */ - public abstract L getAt(int index); - - /** - * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key). - */ - abstract int indexFor(Object key); - - /** - * Returns the total number of stripes in this instance. - */ - public abstract int size(); - - /** - * Returns the stripes that correspond to the passed objects, in ascending (as per - * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned - * by this method are guaranteed to not deadlock each other. - * - * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and - * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number - * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays - * are needed for a pair of them to match). Please consider carefully the implications of the - * number of stripes, the intended concurrency level, and the typical number of keys used in a - * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls - * in Bins model</a> for mathematical formulas that can be used to estimate the probability of - * collisions. - * - * @param keys arbitrary non-null keys - * @return the stripes corresponding to the objects (one per each object, derived by delegating - * to {@link #get(Object)}; may contain duplicates), in an increasing index order. - */ - public Iterable<L> bulkGet(Iterable<?> keys) { - // Initially using the array to store the keys, then reusing it to store the respective L's - final Object[] array = Iterables.toArray(keys, Object.class); - int[] stripes = new int[array.length]; - for (int i = 0; i < array.length; i++) { - stripes[i] = indexFor(array[i]); - } - Arrays.sort(stripes); - for (int i = 0; i < array.length; i++) { - array[i] = getAt(stripes[i]); - } - /* - * Note that the returned Iterable holds references to the returned stripes, to avoid - * error-prone code like: - * - * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)' - * Iterable<Lock> locks = stripedLock.bulkGet(keys); - * for (Lock lock : locks) { - * lock.lock(); - * } - * operation(); - * for (Lock lock : locks) { - * lock.unlock(); - * } - * - * If we only held the int[] stripes, translating it on the fly to L's, the original locks - * might be garbage collected after locking them, ending up in a huge mess. - */ - @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's - List<L> asList = (List<L>) Arrays.asList(array); - return Collections.unmodifiableList(asList); - } - - // Static factories - - /** - * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks, with the - * specified fairness. Every lock is reentrant. - * - * @param stripes the minimum number of stripes (locks) required - * @return a new {@code Striped<Lock>} - */ - public static Striped<Lock> lock(int stripes) { - return new CompactStriped<Lock>(stripes, new Supplier<Lock>() { - public Lock get() { - return new PaddedLock(); - } - }); - } - - /** - * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks, with the - * specified fairness. Every lock is reentrant. - * - * @param stripes the minimum number of stripes (locks) required - * @return a new {@code Striped<Lock>} - */ - public static Striped<Lock> lazyWeakLock(int stripes) { - return new LazyStriped<Lock>(stripes, new Supplier<Lock>() { - public Lock get() { - return new ReentrantLock(false); - } - }); - } - - /** - * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores, - * with the specified number of permits and fairness. - * - * @param stripes the minimum number of stripes (semaphores) required - * @param permits the number of permits in each semaphore - * @return a new {@code Striped<Semaphore>} - */ - public static Striped<Semaphore> semaphore(int stripes, final int permits) { - return new CompactStriped<Semaphore>(stripes, new Supplier<Semaphore>() { - public Semaphore get() { - return new PaddedSemaphore(permits); - } - }); - } - - /** - * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores, - * with the specified number of permits and fairness. - * - * @param stripes the minimum number of stripes (semaphores) required - * @param permits the number of permits in each semaphore - * @return a new {@code Striped<Semaphore>} - */ - public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) { - return new LazyStriped<Semaphore>(stripes, new Supplier<Semaphore>() { - public Semaphore get() { - return new Semaphore(permits, false); - } - }); - } - - /** - * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced - * read-write locks, with the specified fairness. Every lock is reentrant. - * - * @param stripes the minimum number of stripes (locks) required - * @return a new {@code Striped<ReadWriteLock>} - */ - public static Striped<ReadWriteLock> readWriteLock(int stripes) { - return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER); - } - - /** - * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced - * read-write locks, with the specified fairness. Every lock is reentrant. - * - * @param stripes the minimum number of stripes (locks) required - * @return a new {@code Striped<ReadWriteLock>} - */ - public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) { - return new LazyStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER); - } - - // ReentrantReadWriteLock is large enough to make padding probably unnecessary - private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER = - new Supplier<ReadWriteLock>() { - public ReadWriteLock get() { - return new ReentrantReadWriteLock(); - } - }; - - private abstract static class PowerOfTwoStriped<L> extends Striped<L> { - /** Capacity (power of two) minus one, for fast mod evaluation */ - final int mask; - - PowerOfTwoStriped(int stripes) { - Preconditions.checkArgument(stripes > 0, "Stripes must be positive"); - this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1; - } - - @Override final int indexFor(Object key) { - int hash = smear(key.hashCode()); - return hash & mask; - } - - @Override public final L get(Object key) { - return getAt(indexFor(key)); - } - } - - /** - * Implementation of Striped where 2^k stripes are represented as an array of the same length, - * eagerly initialized. - */ - private static class CompactStriped<L> extends PowerOfTwoStriped<L> { - /** Size is a power of two. */ - private final Object[] array; - - private CompactStriped(int stripes, Supplier<L> supplier) { - super(stripes); - Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)"); - - this.array = new Object[mask + 1]; - for (int i = 0; i < array.length; i++) { - array[i] = supplier.get(); - } - } - - @SuppressWarnings("unchecked") // we only put L's in the array - @Override public L getAt(int index) { - return (L) array[index]; - } - - @Override public int size() { - return array.length; - } - } - - /** - * Implementation of Striped where up to 2^k stripes can be represented, using a Cache - * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the - * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. - */ - private static class LazyStriped<L> extends PowerOfTwoStriped<L> { - final ConcurrentMap<Integer, L> cache; - final int size; - - LazyStriped(int stripes, Supplier<L> supplier) { - super(stripes); - this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; - this.cache = new MapMaker().weakValues().makeComputingMap(Functions.forSupplier(supplier)); - } - - @Override public L getAt(int index) { - Preconditions.checkElementIndex(index, size()); - return cache.get(index); - } - - @Override public int size() { - return size; - } - } - - /** - * A bit mask were all bits are set. - */ - private static final int ALL_SET = ~0; - - private static int ceilToPowerOfTwo(int x) { - return 1 << IntMath.log2(x, RoundingMode.CEILING); - } - - /* - * This method was written by Doug Lea with assistance from members of JCP - * JSR-166 Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - * - * As of 2010/06/11, this method is identical to the (package private) hash - * method in OpenJDK 7's java.util.HashMap class. - */ - // Copied from java/com/google/common/collect/Hashing.java - private static int smear(int hashCode) { - hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); - return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); - } - - private static class PaddedLock extends ReentrantLock { - /* - * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add - * a fourth long here, to minimize chance of interference between consecutive locks, - * but I couldn't observe any benefit from that. - */ - @SuppressWarnings("unused") - long q1, q2, q3; - - PaddedLock() { - super(false); - } - } - - private static class PaddedSemaphore extends Semaphore { - // See PaddedReentrantLock comment - @SuppressWarnings("unused") - long q1, q2, q3; - - PaddedSemaphore(int permits) { - super(permits, false); - } - } -} diff --git a/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java b/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java index a05247e..167ad11 100644 --- a/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java +++ b/guava/src/com/google/common/util/concurrent/ThreadFactoryBuilder.java @@ -61,12 +61,9 @@ public final class ThreadFactoryBuilder { * @param nameFormat a {@link String#format(String, Object...)}-compatible * format String, to which a unique integer (0, 1, etc.) will be supplied * as the single parameter. This integer will be unique to the built - * instance of the ThreadFactory and will be assigned sequentially. For - * example, {@code "rpc-pool-%d"} will generate thread names like - * {@code "rpc-pool-0"}, {@code "rpc-pool-1"}, {@code "rpc-pool-2"}, etc. + * instance of the ThreadFactory and will be assigned sequentially. * @return this for the builder pattern */ - @SuppressWarnings("ReturnValueIgnored") public ThreadFactoryBuilder setNameFormat(String nameFormat) { String.format(nameFormat, 0); // fail fast if the format is bad or null this.nameFormat = nameFormat; diff --git a/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java b/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java index 77afdc3..c0c99e1 100644 --- a/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java +++ b/guava/src/com/google/common/util/concurrent/UncheckedExecutionException.java @@ -16,10 +16,9 @@ package com.google.common.util.concurrent; +import com.google.common.annotations.Beta; import com.google.common.annotations.GwtCompatible; -import javax.annotation.Nullable; - /** * Unchecked variant of {@link java.util.concurrent.ExecutionException}. As with * {@code ExecutionException}, the exception's {@linkplain #getCause() cause} @@ -27,16 +26,17 @@ import javax.annotation.Nullable; * * <p>{@code UncheckedExecutionException} is intended as an alternative to * {@code ExecutionException} when the exception thrown by a task is an - * unchecked exception. However, it may also wrap a checked exception in some - * cases. + * unchecked exception. This allows the client code to continue to distinguish + * between checked and unchecked exceptions, even when they come from other + * threads. * * <p>When wrapping an {@code Error} from another thread, prefer {@link - * ExecutionError}. When wrapping a checked exception, prefer {@code - * ExecutionException}. + * ExecutionError}. * * @author Charles Fry * @since 10.0 */ +@Beta @GwtCompatible public class UncheckedExecutionException extends RuntimeException { /** @@ -47,21 +47,21 @@ public class UncheckedExecutionException extends RuntimeException { /** * Creates a new instance with the given detail message. */ - protected UncheckedExecutionException(@Nullable String message) { + protected UncheckedExecutionException(String message) { super(message); } /** * Creates a new instance with the given detail message and cause. */ - public UncheckedExecutionException(@Nullable String message, @Nullable Throwable cause) { + public UncheckedExecutionException(String message, Throwable cause) { super(message, cause); } /** * Creates a new instance with the given cause. */ - public UncheckedExecutionException(@Nullable Throwable cause) { + public UncheckedExecutionException(Throwable cause) { super(cause); } diff --git a/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java b/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java index cefe379..d821c84 100644 --- a/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java +++ b/guava/src/com/google/common/util/concurrent/UncheckedTimeoutException.java @@ -16,8 +16,6 @@ package com.google.common.util.concurrent; -import javax.annotation.Nullable; - /** * Unchecked version of {@link java.util.concurrent.TimeoutException}. * @@ -27,15 +25,15 @@ import javax.annotation.Nullable; public class UncheckedTimeoutException extends RuntimeException { public UncheckedTimeoutException() {} - public UncheckedTimeoutException(@Nullable String message) { + public UncheckedTimeoutException(String message) { super(message); } - public UncheckedTimeoutException(@Nullable Throwable cause) { + public UncheckedTimeoutException(Throwable cause) { super(cause); } - public UncheckedTimeoutException(@Nullable String message, @Nullable Throwable cause) { + public UncheckedTimeoutException(String message, Throwable cause) { super(message, cause); } diff --git a/guava/src/com/google/common/util/concurrent/Uninterruptibles.java b/guava/src/com/google/common/util/concurrent/Uninterruptibles.java index 79d7598..89f30b8 100644 --- a/guava/src/com/google/common/util/concurrent/Uninterruptibles.java +++ b/guava/src/com/google/common/util/concurrent/Uninterruptibles.java @@ -122,9 +122,6 @@ public final class Uninterruptibles { * <p>If instead, you wish to treat {@link InterruptedException} uniformly * with other exceptions, see {@link Futures#get(Future, Class) Futures.get} * or {@link Futures#makeChecked}. - * - * @throws ExecutionException if the computation threw an exception - * @throws CancellationException if the computation was cancelled */ public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { @@ -152,10 +149,6 @@ public final class Uninterruptibles { * <p>If instead, you wish to treat {@link InterruptedException} uniformly * with other exceptions, see {@link Futures#get(Future, Class) Futures.get} * or {@link Futures#makeChecked}. - * - * @throws ExecutionException if the computation threw an exception - * @throws CancellationException if the computation was cancelled - * @throws TimeoutException if the wait timed out */ public static <V> V getUninterruptibly( Future<V> future, long timeout, TimeUnit unit) @@ -233,11 +226,6 @@ public final class Uninterruptibles { /** * Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)} * uninterruptibly. - * - * @throws ClassCastException if the class of the specified element prevents - * it from being added to the given queue - * @throws IllegalArgumentException if some property of the specified element - * prevents it from being added to the given queue */ public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) { boolean interrupted = false; diff --git a/guava/src/com/google/common/util/concurrent/package-info.java b/guava/src/com/google/common/util/concurrent/package-info.java index 6ea5069..6281552 100644 --- a/guava/src/com/google/common/util/concurrent/package-info.java +++ b/guava/src/com/google/common/util/concurrent/package-info.java @@ -33,4 +33,3 @@ package com.google.common.util.concurrent; import javax.annotation.ParametersAreNonnullByDefault; - |