aboutsummaryrefslogtreecommitdiffstats
path: root/guava/src/com/google/common/util/concurrent/AbstractService.java
diff options
context:
space:
mode:
Diffstat (limited to 'guava/src/com/google/common/util/concurrent/AbstractService.java')
-rw-r--r--guava/src/com/google/common/util/concurrent/AbstractService.java457
1 files changed, 82 insertions, 375 deletions
diff --git a/guava/src/com/google/common/util/concurrent/AbstractService.java b/guava/src/com/google/common/util/concurrent/AbstractService.java
index f028a59..f84b374 100644
--- a/guava/src/com/google/common/util/concurrent/AbstractService.java
+++ b/guava/src/com/google/common/util/concurrent/AbstractService.java
@@ -16,148 +16,68 @@
package com.google.common.util.concurrent;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.Beta;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Service.State; // javadoc needs this
-import java.util.List;
-import java.util.Queue;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.Immutable;
/**
- * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
- * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
- * callbacks. Its subclasses must manage threads manually; consider
- * {@link AbstractExecutionThreadService} if you need only a single execution thread.
+ * Base class for implementing services that can handle {@link #doStart} and
+ * {@link #doStop} requests, responding to them with {@link #notifyStarted()}
+ * and {@link #notifyStopped()} callbacks. Its subclasses must manage threads
+ * manually; consider {@link AbstractExecutionThreadService} if you need only a
+ * single execution thread.
*
* @author Jesse Wilson
- * @author Luke Sandberg
* @since 1.0
*/
@Beta
public abstract class AbstractService implements Service {
- private static final Logger logger = Logger.getLogger(AbstractService.class.getName());
+
private final ReentrantLock lock = new ReentrantLock();
private final Transition startup = new Transition();
private final Transition shutdown = new Transition();
/**
- * The listeners to notify during a state transition.
- */
- @GuardedBy("lock")
- private final List<ListenerExecutorPair> listeners = Lists.newArrayList();
-
- /**
- * The queue of listeners that are waiting to be executed.
- *
- * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be
- * protected by the implicit lock on this object. Dequeue operations should be executed atomically
- * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be
- * held when the listeners are being executed. Use {@link #executeListeners} for this operation.
- * This is necessary to ensure that elements on the queue are executed in the correct order.
- * Enqueue operations should be protected so that listeners are added in the correct order. We use
- * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues.
+ * The internal state, which equals external state unless
+ * shutdownWhenStartupFinishes is true. Guarded by {@code lock}.
*/
- @GuardedBy("queuedListeners")
- private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
-
+ private State state = State.NEW;
+
/**
- * The current state of the service. This should be written with the lock held but can be read
- * without it because it is an immutable object in a volatile field. This is desirable so that
- * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
- * without grabbing the lock.
- *
- * <p>To update this field correctly the lock must be held to guarantee that the state is
- * consistent.
+ * If true, the user requested a shutdown while the service was still starting
+ * up. Guarded by {@code lock}.
*/
- @GuardedBy("lock")
- private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW);
+ private boolean shutdownWhenStartupFinishes = false;
- /** Constructor for use by subclasses. */
- protected AbstractService() {
- // Add a listener to update the futures. This needs to be added first so that it is executed
- // before the other listeners. This way the other listeners can access the completed futures.
- addListener(
- new Listener() {
- @Override public void starting() {}
-
- @Override public void running() {
- startup.set(State.RUNNING);
- }
-
- @Override public void stopping(State from) {
- if (from == State.STARTING) {
- startup.set(State.STOPPING);
- }
- }
-
- @Override public void terminated(State from) {
- if (from == State.NEW) {
- startup.set(State.TERMINATED);
- }
- shutdown.set(State.TERMINATED);
- }
-
- @Override public void failed(State from, Throwable failure) {
- switch (from) {
- case STARTING:
- startup.setException(failure);
- shutdown.setException(new Exception("Service failed to start.", failure));
- break;
- case RUNNING:
- shutdown.setException(new Exception("Service failed while running", failure));
- break;
- case STOPPING:
- shutdown.setException(failure);
- break;
- case TERMINATED: /* fall-through */
- case FAILED: /* fall-through */
- case NEW: /* fall-through */
- default:
- throw new AssertionError("Unexpected from state: " + from);
- }
- }
- },
- MoreExecutors.sameThreadExecutor());
- }
-
/**
- * This method is called by {@link #start} to initiate service startup. The invocation of this
- * method should cause a call to {@link #notifyStarted()}, either during this method's run, or
- * after it has returned. If startup fails, the invocation should cause a call to
- * {@link #notifyFailed(Throwable)} instead.
+ * This method is called by {@link #start} to initiate service startup. The
+ * invocation of this method should cause a call to {@link #notifyStarted()},
+ * either during this method's run, or after it has returned. If startup
+ * fails, the invocation should cause a call to {@link
+ * #notifyFailed(Throwable)} instead.
*
- * <p>This method should return promptly; prefer to do work on a different thread where it is
- * convenient. It is invoked exactly once on service startup, even when {@link #start} is called
- * multiple times.
+ * <p>This method should return promptly; prefer to do work on a different
+ * thread where it is convenient. It is invoked exactly once on service
+ * startup, even when {@link #start} is called multiple times.
*/
protected abstract void doStart();
/**
- * This method should be used to initiate service shutdown. The invocation of this method should
- * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
- * returned. If shutdown fails, the invocation should cause a call to
- * {@link #notifyFailed(Throwable)} instead.
+ * This method should be used to initiate service shutdown. The invocation
+ * of this method should cause a call to {@link #notifyStopped()}, either
+ * during this method's run, or after it has returned. If shutdown fails, the
+ * invocation should cause a call to {@link #notifyFailed(Throwable)} instead.
*
- * <p> This method should return promptly; prefer to do work on a different thread where it is
- * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called
- * multiple times.
+ * <p>This method should return promptly; prefer to do work on a different
+ * thread where it is convenient. It is invoked exactly once on service
+ * shutdown, even when {@link #stop} is called multiple times.
*/
protected abstract void doStop();
@@ -165,16 +85,15 @@ public abstract class AbstractService implements Service {
public final ListenableFuture<State> start() {
lock.lock();
try {
- if (snapshot.state == State.NEW) {
- snapshot = new StateSnapshot(State.STARTING);
- starting();
+ if (state == State.NEW) {
+ state = State.STARTING;
doStart();
}
} catch (Throwable startupFailure) {
+ // put the exception in the future, the user can get it via Future.get()
notifyFailed(startupFailure);
} finally {
lock.unlock();
- executeListeners();
}
return startup;
@@ -184,33 +103,22 @@ public abstract class AbstractService implements Service {
public final ListenableFuture<State> stop() {
lock.lock();
try {
- switch (snapshot.state) {
- case NEW:
- snapshot = new StateSnapshot(State.TERMINATED);
- terminated(State.NEW);
- break;
- case STARTING:
- snapshot = new StateSnapshot(State.STARTING, true, null);
- stopping(State.STARTING);
- break;
- case RUNNING:
- snapshot = new StateSnapshot(State.STOPPING);
- stopping(State.RUNNING);
- doStop();
- break;
- case STOPPING:
- case TERMINATED:
- case FAILED:
- // do nothing
- break;
- default:
- throw new AssertionError("Unexpected state: " + snapshot.state);
+ if (state == State.NEW) {
+ state = State.TERMINATED;
+ startup.set(State.TERMINATED);
+ shutdown.set(State.TERMINATED);
+ } else if (state == State.STARTING) {
+ shutdownWhenStartupFinishes = true;
+ startup.set(State.STOPPING);
+ } else if (state == State.RUNNING) {
+ state = State.STOPPING;
+ doStop();
}
} catch (Throwable shutdownFailure) {
+ // put the exception in the future, the user can get it via Future.get()
notifyFailed(shutdownFailure);
} finally {
lock.unlock();
- executeListeners();
}
return shutdown;
@@ -227,91 +135,84 @@ public abstract class AbstractService implements Service {
}
/**
- * Implementing classes should invoke this method once their service has started. It will cause
- * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
+ * Implementing classes should invoke this method once their service has
+ * started. It will cause the service to transition from {@link
+ * State#STARTING} to {@link State#RUNNING}.
*
- * @throws IllegalStateException if the service is not {@link State#STARTING}.
+ * @throws IllegalStateException if the service is not
+ * {@link State#STARTING}.
*/
protected final void notifyStarted() {
lock.lock();
try {
- if (snapshot.state != State.STARTING) {
+ if (state != State.STARTING) {
IllegalStateException failure = new IllegalStateException(
- "Cannot notifyStarted() when the service is " + snapshot.state);
+ "Cannot notifyStarted() when the service is " + state);
notifyFailed(failure);
throw failure;
}
- if (snapshot.shutdownWhenStartupFinishes) {
- snapshot = new StateSnapshot(State.STOPPING);
- // We don't call listeners here because we already did that when we set the
- // shutdownWhenStartupFinishes flag.
- doStop();
+ state = State.RUNNING;
+ if (shutdownWhenStartupFinishes) {
+ stop();
} else {
- snapshot = new StateSnapshot(State.RUNNING);
- running();
+ startup.set(State.RUNNING);
}
} finally {
lock.unlock();
- executeListeners();
}
}
/**
- * Implementing classes should invoke this method once their service has stopped. It will cause
- * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
+ * Implementing classes should invoke this method once their service has
+ * stopped. It will cause the service to transition from {@link
+ * State#STOPPING} to {@link State#TERMINATED}.
*
- * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
- * {@link State#RUNNING}.
+ * @throws IllegalStateException if the service is neither {@link
+ * State#STOPPING} nor {@link State#RUNNING}.
*/
protected final void notifyStopped() {
lock.lock();
try {
- if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) {
+ if (state != State.STOPPING && state != State.RUNNING) {
IllegalStateException failure = new IllegalStateException(
- "Cannot notifyStopped() when the service is " + snapshot.state);
+ "Cannot notifyStopped() when the service is " + state);
notifyFailed(failure);
throw failure;
}
- State previous = snapshot.state;
- snapshot = new StateSnapshot(State.TERMINATED);
- terminated(previous);
+
+ state = State.TERMINATED;
+ shutdown.set(State.TERMINATED);
} finally {
lock.unlock();
- executeListeners();
}
}
/**
- * Invoke this method to transition the service to the {@link State#FAILED}. The service will
- * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
- * or otherwise cannot be started nor stopped.
+ * Invoke this method to transition the service to the
+ * {@link State#FAILED}. The service will <b>not be stopped</b> if it
+ * is running. Invoke this method when a service has failed critically or
+ * otherwise cannot be started nor stopped.
*/
protected final void notifyFailed(Throwable cause) {
checkNotNull(cause);
lock.lock();
try {
- switch (snapshot.state) {
- case NEW:
- case TERMINATED:
- throw new IllegalStateException("Failed while in state:" + snapshot.state, cause);
- case RUNNING:
- case STARTING:
- case STOPPING:
- State previous = snapshot.state;
- snapshot = new StateSnapshot(State.FAILED, false, cause);
- failed(previous, cause);
- break;
- case FAILED:
- // Do nothing
- break;
- default:
- throw new AssertionError("Unexpected state: " + snapshot.state);
+ if (state == State.STARTING) {
+ startup.setException(cause);
+ shutdown.setException(new Exception(
+ "Service failed to start.", cause));
+ } else if (state == State.STOPPING) {
+ shutdown.setException(cause);
+ } else if (state == State.RUNNING) {
+ shutdown.setException(new Exception("Service failed while running", cause));
+ } else if (state == State.NEW || state == State.TERMINATED) {
+ throw new IllegalStateException("Failed while in state:" + state, cause);
}
+ state = State.FAILED;
} finally {
lock.unlock();
- executeListeners();
}
}
@@ -322,28 +223,12 @@ public abstract class AbstractService implements Service {
@Override
public final State state() {
- return snapshot.externalState();
- }
-
- /**
- * @since 14.0
- */
- @Override
- public final Throwable failureCause() {
- return snapshot.failureCause();
- }
-
- /**
- * @since 13.0
- */
- @Override
- public final void addListener(Listener listener, Executor executor) {
- checkNotNull(listener, "listener");
- checkNotNull(executor, "executor");
lock.lock();
try {
- if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) {
- listeners.add(new ListenerExecutorPair(listener, executor));
+ if (shutdownWhenStartupFinishes && state == State.STARTING) {
+ return State.STOPPING;
+ } else {
+ return state;
}
} finally {
lock.unlock();
@@ -368,182 +253,4 @@ public abstract class AbstractService implements Service {
}
}
}
-
- /**
- * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the
- * {@link #lock}.
- */
- private void executeListeners() {
- if (!lock.isHeldByCurrentThread()) {
- synchronized (queuedListeners) {
- Runnable listener;
- while ((listener = queuedListeners.poll()) != null) {
- listener.run();
- }
- }
- }
- }
-
- @GuardedBy("lock")
- private void starting() {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.starting();
- }
- });
- }
- });
- }
- }
-
- @GuardedBy("lock")
- private void running() {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.running();
- }
- });
- }
- });
- }
- }
-
- @GuardedBy("lock")
- private void stopping(final State from) {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.stopping(from);
- }
- });
- }
- });
- }
- }
-
- @GuardedBy("lock")
- private void terminated(final State from) {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.terminated(from);
- }
- });
- }
- });
- }
- // There are no more state transitions so we can clear this out.
- listeners.clear();
- }
-
- @GuardedBy("lock")
- private void failed(final State from, final Throwable cause) {
- for (final ListenerExecutorPair pair : listeners) {
- queuedListeners.add(new Runnable() {
- @Override public void run() {
- pair.execute(new Runnable() {
- @Override public void run() {
- pair.listener.failed(from, cause);
- }
- });
- }
- });
- }
- // There are no more state transitions so we can clear this out.
- listeners.clear();
- }
-
- /** A simple holder for a listener and its executor. */
- private static class ListenerExecutorPair {
- final Listener listener;
- final Executor executor;
-
- ListenerExecutorPair(Listener listener, Executor executor) {
- this.listener = listener;
- this.executor = executor;
- }
-
- /**
- * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all
- * exceptions
- */
- void execute(Runnable runnable) {
- try {
- executor.execute(runnable);
- } catch (Exception e) {
- logger.log(Level.SEVERE, "Exception while executing listener " + listener
- + " with executor " + executor, e);
- }
- }
- }
-
- /**
- * An immutable snapshot of the current state of the service. This class represents a consistent
- * snapshot of the state and therefore it can be used to answer simple queries without needing to
- * grab a lock.
- */
- @Immutable
- private static final class StateSnapshot {
- /**
- * The internal state, which equals external state unless
- * shutdownWhenStartupFinishes is true.
- */
- final State state;
-
- /**
- * If true, the user requested a shutdown while the service was still starting
- * up.
- */
- final boolean shutdownWhenStartupFinishes;
-
- /**
- * The exception that caused this service to fail. This will be {@code null}
- * unless the service has failed.
- */
- @Nullable
- final Throwable failure;
-
- StateSnapshot(State internalState) {
- this(internalState, false, null);
- }
-
- StateSnapshot(
- State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
- checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING,
- "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
- internalState);
- checkArgument(!(failure != null ^ internalState == State.FAILED),
- "A failure cause should be set if and only if the state is failed. Got %s and %s "
- + "instead.", internalState, failure);
- this.state = internalState;
- this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
- this.failure = failure;
- }
-
- /** @see Service#state() */
- State externalState() {
- if (shutdownWhenStartupFinishes && state == State.STARTING) {
- return State.STOPPING;
- } else {
- return state;
- }
- }
-
- /** @see Service#failureCause() */
- Throwable failureCause() {
- checkState(state == State.FAILED,
- "failureCause() is only valid if the service has failed, service is %s", state);
- return failure;
- }
- }
}