diff options
Diffstat (limited to 'guava/src/com/google/common/util/concurrent/ServiceManager.java')
-rw-r--r-- | guava/src/com/google/common/util/concurrent/ServiceManager.java | 724 |
1 files changed, 0 insertions, 724 deletions
diff --git a/guava/src/com/google/common/util/concurrent/ServiceManager.java b/guava/src/com/google/common/util/concurrent/ServiceManager.java deleted file mode 100644 index c779b23..0000000 --- a/guava/src/com/google/common/util/concurrent/ServiceManager.java +++ /dev/null @@ -1,724 +0,0 @@ -/* - * Copyright (C) 2012 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.common.util.concurrent; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.base.Objects; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Ordering; -import com.google.common.collect.Queues; -import com.google.common.util.concurrent.Service.State; - -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.Immutable; -import javax.inject.Inject; -import javax.inject.Singleton; - -/** - * A manager for monitoring and controlling a set of {@link Service services}. This class provides - * methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and - * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}. - * Additionally, users can monitor state transitions with the {@link Listener listener} mechanism. - * - * <p>While it is recommended that service lifecycles be managed via this class, state transitions - * initiated via other mechanisms do not impact the correctness of its methods. For example, if the - * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked - * when appropriate and {@link #awaitHealthy} will still work as expected. - * - * <p>Here is a simple example of how to use a {@link ServiceManager} to start a server. - * <pre> {@code - * class Server { - * public static void main(String[] args) { - * Set<Service> services = ...; - * ServiceManager manager = new ServiceManager(services); - * manager.addListener(new Listener() { - * public void stopped() {} - * public void healthy() { - * // Services have been initialized and are healthy, start accepting requests... - * } - * public void failure(Service service) { - * // Something failed, at this point we could log it, notify a load balancer, or take - * // some other action. For now we will just exit. - * System.exit(1); - * } - * }, - * MoreExecutors.sameThreadExecutor()); - * - * Runtime.getRuntime().addShutdownHook(new Thread() { - * public void run() { - * // Give the services 5 seconds to stop to ensure that we are responsive to shutdown - * // requests. - * try { - * manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS); - * } catch (TimeoutException timeout) { - * // stopping timed out - * } - * } - * }); - * manager.startAsync(); // start all the services asynchronously - * } - * }}</pre> - * - * This class uses the ServiceManager's methods to start all of its services, to respond to service - * failure and to ensure that when the JVM is shutting down all the services are stopped. - * - * @author Luke Sandberg - * @since 14.0 - */ -@Beta -@Singleton -public final class ServiceManager { - private static final Logger logger = Logger.getLogger(ServiceManager.class.getName()); - - /** - * A listener for the aggregate state changes of the services that are under management. Users - * that need to listen to more fine-grained events (such as when each particular - * {@link Service service} starts, or terminates), should attach {@link Service.Listener service - * listeners} to each individual service. - * - * @author Luke Sandberg - * @since 14.0 - */ - @Beta // Should come out of Beta when ServiceManager does - public static interface Listener { - /** - * Called when the service initially becomes healthy. - * - * <p>This will be called at most once after all the services have entered the - * {@linkplain State#RUNNING running} state. If any services fail during start up or - * {@linkplain State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other - * services have started {@linkplain State#RUNNING running} then this method will not be called. - */ - void healthy(); - - /** - * Called when the all of the component services have reached a terminal state, either - * {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}. - */ - void stopped(); - - /** - * Called when a component service has {@linkplain State#FAILED failed}. - * - * @param service The service that failed. - */ - void failure(Service service); - } - - /** - * An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener - * service listeners}. This is extracted into its own object so that {@link ServiceListener} - * could be made {@code static} and its instances can be safely constructed and added in the - * {@link ServiceManager} constructor without having to close over the partially constructed - * {@link ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}). - */ - private final ServiceManagerState state; - private final ImmutableMap<Service, ServiceListener> services; - - /** - * Constructs a new instance for managing the given services. - * - * @param services The services to manage - * - * @throws IllegalArgumentException if not all services are {@link State#NEW new} or if there are - * any duplicate services. - */ - public ServiceManager(Iterable<? extends Service> services) { - ImmutableList<Service> copy = ImmutableList.copyOf(services); - this.state = new ServiceManagerState(copy.size()); - ImmutableMap.Builder<Service, ServiceListener> builder = ImmutableMap.builder(); - Executor executor = MoreExecutors.sameThreadExecutor(); - for (Service service : copy) { - ServiceListener listener = new ServiceListener(service, state); - service.addListener(listener, executor); - // We check the state after adding the listener as a way to ensure that our listener was added - // to a NEW service. - checkArgument(service.state() == State.NEW, "Can only manage NEW services, %s", service); - builder.put(service, listener); - } - this.services = builder.build(); - } - - /** - * Constructs a new instance for managing the given services. This constructor is provided so that - * dependency injection frameworks can inject instances of {@link ServiceManager}. - * - * @param services The services to manage - * - * @throws IllegalStateException if not all services are {@link State#NEW new}. - */ - @Inject ServiceManager(Set<Service> services) { - this((Iterable<Service>) services); - } - - /** - * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given - * executor. The listener will not have previous state changes replayed, so it is - * suggested that listeners are added before any of the managed services are - * {@linkplain Service#start started}. - * - * <p>There is no guaranteed ordering of execution of listeners, but any listener added through - * this method is guaranteed to be called whenever there is a state change. - * - * <p>Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown - * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an exception - * thrown by {@linkplain MoreExecutors#sameThreadExecutor inline execution}) will be caught and - * logged. - * - * @param listener the listener to run when the manager changes state - * @param executor the executor in which the the listeners callback methods will be run. For fast, - * lightweight listeners that would be safe to execute in any thread, consider - * {@link MoreExecutors#sameThreadExecutor}. - */ - public void addListener(Listener listener, Executor executor) { - state.addListener(listener, executor); - } - - /** - * Initiates service {@linkplain Service#start startup} on all the services being managed. It is - * only valid to call this method if all of the services are {@linkplain State#NEW new}. - * - * @return this - * @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the - * method is called, {@link State#TERMINATED terminated} or {@link State#FAILED failed}. - */ - public ServiceManager startAsync() { - for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) { - Service service = entry.getKey(); - State state = service.state(); - checkState(state == State.NEW, "Service %s is %s, cannot start it.", service, - state); - } - for (ServiceListener service : services.values()) { - service.start(); - } - return this; - } - - /** - * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}. The manager - * will become healthy after all the component services have reached the {@linkplain State#RUNNING - * running} state. - * - * @throws IllegalStateException if the service manager reaches a state from which it cannot - * become {@linkplain #isHealthy() healthy}. - */ - public void awaitHealthy() { - state.awaitHealthy(); - checkState(isHealthy(), "Expected to be healthy after starting"); - } - - /** - * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more - * than the given time. The manager will become healthy after all the component services have - * reached the {@linkplain State#RUNNING running} state. - * - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @throws TimeoutException if not all of the services have finished starting within the deadline - * @throws IllegalStateException if the service manager reaches a state from which it cannot - * become {@linkplain #isHealthy() healthy}. - */ - public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException { - if (!state.awaitHealthy(timeout, unit)) { - // It would be nice to tell the caller who we are still waiting on, and this information is - // likely to be in servicesByState(), however due to race conditions we can't actually tell - // which services are holding up healthiness. The current set of NEW or STARTING services is - // likely to point out the culprit, but may not. If we really wanted to solve this we could - // change state to track exactly which services have started and then we could accurately - // report on this. But it is only for logging so we likely don't care. - throw new TimeoutException("Timeout waiting for the services to become healthy."); - } - checkState(isHealthy(), "Expected to be healthy after starting"); - } - - /** - * Initiates service {@linkplain Service#stop shutdown} if necessary on all the services being - * managed. - * - * @return this - */ - public ServiceManager stopAsync() { - for (Service service : services.keySet()) { - service.stop(); - } - return this; - } - - /** - * Waits for the all the services to reach a terminal state. After this method returns all - * services will either be {@link Service.State#TERMINATED terminated} or - * {@link Service.State#FAILED failed} - */ - public void awaitStopped() { - state.awaitStopped(); - } - - /** - * Waits for the all the services to reach a terminal state for no more than the given time. After - * this method returns all services will either be {@link Service.State#TERMINATED terminated} or - * {@link Service.State#FAILED failed} - * - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @throws TimeoutException if not all of the services have stopped within the deadline - */ - public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException { - if (!state.awaitStopped(timeout, unit)) { - throw new TimeoutException("Timeout waiting for the services to stop."); - } - } - - /** - * Returns true if all services are currently in the {@linkplain State#RUNNING running} state. - * - * <p>Users who want more detailed information should use the {@link #servicesByState} method to - * get detailed information about which services are not running. - */ - public boolean isHealthy() { - for (Service service : services.keySet()) { - if (!service.isRunning()) { - return false; - } - } - return true; - } - - /** - * Provides a snapshot of the current state of all the services under management. - * - * <p>N.B. This snapshot it not guaranteed to be consistent, i.e. the set of states returned may - * not correspond to any particular point in time view of the services. - */ - public ImmutableMultimap<State, Service> servicesByState() { - ImmutableMultimap.Builder<State, Service> builder = ImmutableMultimap.builder(); - for (Service service : services.keySet()) { - builder.put(service.state(), service); - } - return builder.build(); - } - - /** - * Returns the service load times. This value will only return startup times for services that - * have finished starting. - * - * @return Map of services and their corresponding startup time in millis, the map entries will be - * ordered by startup time. - */ - public ImmutableMap<Service, Long> startupTimes() { - Map<Service, Long> loadTimeMap = Maps.newHashMapWithExpectedSize(services.size()); - for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) { - State state = entry.getKey().state(); - if (state != State.NEW && state != State.STARTING) { - loadTimeMap.put(entry.getKey(), entry.getValue().startupTimeMillis()); - } - } - List<Entry<Service, Long>> servicesByStartTime = Ordering.<Long>natural() - .onResultOf(new Function<Map.Entry<Service, Long>, Long>() { - @Override public Long apply(Map.Entry<Service, Long> input) { - return input.getValue(); - } - }) - .sortedCopy(loadTimeMap.entrySet()); - ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder(); - for (Map.Entry<Service, Long> entry : servicesByStartTime) { - builder.put(entry); - } - return builder.build(); - } - - @Override public String toString() { - return Objects.toStringHelper(ServiceManager.class) - .add("services", services.keySet()) - .toString(); - } - - /** - * An encapsulation of all the mutable state of the {@link ServiceManager} that needs to be - * accessed by instances of {@link ServiceListener}. - */ - private static final class ServiceManagerState { - final Monitor monitor = new Monitor(); - final int numberOfServices; - /** The number of services that have not finished starting up. */ - @GuardedBy("monitor") - int unstartedServices; - /** The number of services that have not reached a terminal state. */ - @GuardedBy("monitor") - int unstoppedServices; - /** - * Controls how long to wait for all the service manager to either become healthy or reach a - * state where it is guaranteed that it can never become healthy. - */ - final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) { - @Override public boolean isSatisfied() { - // All services have started or some service has terminated/failed. - return unstartedServices == 0 || unstoppedServices != numberOfServices; - } - }; - /** - * Controls how long to wait for all services to reach a terminal state. - */ - final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) { - @Override public boolean isSatisfied() { - return unstoppedServices == 0; - } - }; - /** The listeners to notify during a state transition. */ - @GuardedBy("monitor") - final List<ListenerExecutorPair> listeners = Lists.newArrayList(); - /** - * The queue of listeners that are waiting to be executed. - * - * <p>Enqueue operations should be protected by {@link #monitor} while dequeue operations should - * be protected by the implicit lock on this object. This is to ensure that listeners are - * executed in the correct order and also so that a listener can not hold the {@link #monitor} - * for an arbitrary amount of time (listeners can only block other listeners, not internal state - * transitions). We use a concurrent queue implementation so that enqueues can be executed - * concurrently with dequeues. - */ - @GuardedBy("queuedListeners") - final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue(); - - ServiceManagerState(int numberOfServices) { - this.numberOfServices = numberOfServices; - this.unstoppedServices = numberOfServices; - this.unstartedServices = numberOfServices; - } - - void addListener(Listener listener, Executor executor) { - checkNotNull(listener, "listener"); - checkNotNull(executor, "executor"); - monitor.enter(); - try { - // no point in adding a listener that will never be called - if (unstartedServices > 0 || unstoppedServices > 0) { - listeners.add(new ListenerExecutorPair(listener, executor)); - } - } finally { - monitor.leave(); - } - } - - void awaitHealthy() { - monitor.enter(); - try { - monitor.waitForUninterruptibly(awaitHealthGuard); - } finally { - monitor.leave(); - } - } - - boolean awaitHealthy(long timeout, TimeUnit unit) { - monitor.enter(); - try { - if (monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) { - return true; - } - return false; - } finally { - monitor.leave(); - } - } - - void awaitStopped() { - monitor.enter(); - try { - monitor.waitForUninterruptibly(stoppedGuard); - } finally { - monitor.leave(); - } - } - - boolean awaitStopped(long timeout, TimeUnit unit) { - monitor.enter(); - try { - return monitor.waitForUninterruptibly(stoppedGuard, timeout, unit); - } finally { - monitor.leave(); - } - } - - /** - * This should be called when a service finishes starting up. - * - * @param currentlyHealthy whether or not the service that finished starting was healthy at the - * time that it finished starting. - */ - @GuardedBy("monitor") - private void serviceFinishedStarting(Service service, boolean currentlyHealthy) { - checkState(unstartedServices > 0, - "All services should have already finished starting but %s just finished.", service); - unstartedServices--; - if (currentlyHealthy && unstartedServices == 0 && unstoppedServices == numberOfServices) { - // This means that the manager is currently healthy, or at least it should have been - // healthy at some point from some perspective. Calling isHealthy is not currently - // guaranteed to return true because any service could fail right now. However, the - // happens-before relationship enforced by the monitor ensures that this method was called - // before either serviceTerminated or serviceFailed, so we know that the manager was at - // least healthy for some period of time. Furthermore we are guaranteed that this call to - // healthy() will be before any call to terminated() or failure(Service) on the listener. - // So it is correct to execute the listener's health() callback. - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.healthy(); - } - }); - } - }); - } - } - } - - /** - * This should be called when a service is {@linkplain State#TERMINATED terminated}. - */ - @GuardedBy("monitor") - private void serviceTerminated(Service service) { - serviceStopped(service); - } - - /** - * This should be called when a service is {@linkplain State#FAILED failed}. - */ - @GuardedBy("monitor") - private void serviceFailed(final Service service) { - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.failure(service); - } - }); - } - }); - } - serviceStopped(service); - } - - /** - * Should be called whenever a service reaches a terminal state ( - * {@linkplain State#TERMINATED terminated} or - * {@linkplain State#FAILED failed}). - */ - @GuardedBy("monitor") - private void serviceStopped(Service service) { - checkState(unstoppedServices > 0, - "All services should have already stopped but %s just stopped.", service); - unstoppedServices--; - if (unstoppedServices == 0) { - checkState(unstartedServices == 0, - "All services are stopped but %d services haven't finished starting", - unstartedServices); - for (final ListenerExecutorPair pair : listeners) { - queuedListeners.add(new Runnable() { - @Override public void run() { - pair.execute(new Runnable() { - @Override public void run() { - pair.listener.stopped(); - } - }); - } - }); - } - // no more listeners could possibly be called, so clear them out - listeners.clear(); - } - } - - /** - * Attempts to execute all the listeners in {@link #queuedListeners}. - */ - private void executeListeners() { - checkState(!monitor.isOccupiedByCurrentThread(), - "It is incorrect to execute listeners with the monitor held."); - synchronized (queuedListeners) { - Runnable listener; - while ((listener = queuedListeners.poll()) != null) { - listener.run(); - } - } - } - } - - /** - * A {@link Service} that wraps another service and times how long it takes for it to start and - * also calls the {@link ServiceManagerState#serviceFinishedStarting}, - * {@link ServiceManagerState#serviceTerminated} and {@link ServiceManagerState#serviceFailed} - * according to its current state. - */ - private static final class ServiceListener implements Service.Listener { - @GuardedBy("watch") // AFAICT Stopwatch is not thread safe so we need to protect accesses - final Stopwatch watch = new Stopwatch(); - final Service service; - final ServiceManagerState state; - - /** - * @param service the service that - */ - ServiceListener(Service service, ServiceManagerState state) { - this.service = service; - this.state = state; - } - - @Override public void starting() { - // This can happen if someone besides the ServiceManager starts the service, in this case - // our timings may be inaccurate. - startTimer(); - } - - @Override public void running() { - state.monitor.enter(); - try { - finishedStarting(true); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - - @Override public void stopping(State from) { - if (from == State.STARTING) { - state.monitor.enter(); - try { - finishedStarting(false); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - } - - @Override public void terminated(State from) { - logger.info("Service " + service + " has terminated. Previous state was " + from + " state."); - state.monitor.enter(); - try { - if (from == State.NEW) { - // startTimer is idempotent, so this is safe to call and it may be necessary if no one has - // started the timer yet. - startTimer(); - finishedStarting(false); - } - state.serviceTerminated(service); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - - @Override public void failed(State from, Throwable failure) { - logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.", - failure); - state.monitor.enter(); - try { - if (from == State.STARTING) { - finishedStarting(false); - } - state.serviceFailed(service); - } finally { - state.monitor.leave(); - state.executeListeners(); - } - } - - /** - * Stop the stopwatch, log the startup time and decrement the startup latch - * - * @param currentlyHealthy whether or not the service that finished starting is currently - * healthy - */ - @GuardedBy("monitor") - void finishedStarting(boolean currentlyHealthy) { - synchronized (watch) { - watch.stop(); - logger.log(Level.INFO, "Started " + service + " in " + startupTimeMillis() + " ms."); - } - state.serviceFinishedStarting(service, currentlyHealthy); - } - - void start() { - startTimer(); - service.start(); - } - - /** Start the timer if it hasn't been started. */ - void startTimer() { - synchronized (watch) { - if (!watch.isRunning()) { // only start the watch once. - watch.start(); - logger.log(Level.INFO, "Starting {0}", service); - } - } - } - - /** Returns the amount of time it took for the service to finish starting in milliseconds. */ - synchronized long startupTimeMillis() { - synchronized (watch) { - return watch.elapsed(MILLISECONDS); - } - } - } - - /** Simple value object binding a listener to its executor. */ - @Immutable private static final class ListenerExecutorPair { - final Listener listener; - final Executor executor; - - ListenerExecutorPair(Listener listener, Executor executor) { - this.listener = listener; - this.executor = executor; - } - - /** - * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all - * exceptions - */ - void execute(Runnable runnable) { - try { - executor.execute(runnable); - } catch (Exception e) { - logger.log(Level.SEVERE, "Exception while executing listener " + listener - + " with executor " + executor, e); - } - } - } -} |