aboutsummaryrefslogtreecommitdiffstats
path: root/guava/src/com/google/common/util/concurrent/MoreExecutors.java
diff options
context:
space:
mode:
Diffstat (limited to 'guava/src/com/google/common/util/concurrent/MoreExecutors.java')
-rw-r--r--guava/src/com/google/common/util/concurrent/MoreExecutors.java299
1 files changed, 45 insertions, 254 deletions
diff --git a/guava/src/com/google/common/util/concurrent/MoreExecutors.java b/guava/src/com/google/common/util/concurrent/MoreExecutors.java
index bd94db7..915b96d 100644
--- a/guava/src/com/google/common/util/concurrent/MoreExecutors.java
+++ b/guava/src/com/google/common/util/concurrent/MoreExecutors.java
@@ -16,26 +16,15 @@
package com.google.common.util.concurrent;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -44,7 +33,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -79,8 +67,16 @@ public final class MoreExecutors {
@Beta
public static ExecutorService getExitingExecutorService(
ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- return new Application()
- .getExitingExecutorService(executor, terminationTimeout, timeUnit);
+ executor.setThreadFactory(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setThreadFactory(executor.getThreadFactory())
+ .build());
+
+ ExecutorService service = Executors.unconfigurableExecutorService(executor);
+
+ addDelayedShutdownHook(service, terminationTimeout, timeUnit);
+
+ return service;
}
/**
@@ -101,9 +97,19 @@ public final class MoreExecutors {
*/
@Beta
public static ScheduledExecutorService getExitingScheduledExecutorService(
- ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- return new Application()
- .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
+ ScheduledThreadPoolExecutor executor, long terminationTimeout,
+ TimeUnit timeUnit) {
+ executor.setThreadFactory(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setThreadFactory(executor.getThreadFactory())
+ .build());
+
+ ScheduledExecutorService service =
+ Executors.unconfigurableScheduledExecutorService(executor);
+
+ addDelayedShutdownHook(service, terminationTimeout, timeUnit);
+
+ return service;
}
/**
@@ -119,9 +125,24 @@ public final class MoreExecutors {
*/
@Beta
public static void addDelayedShutdownHook(
- ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
- new Application()
- .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
+ final ExecutorService service, final long terminationTimeout,
+ final TimeUnit timeUnit) {
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // We'd like to log progress and failures that may arise in the
+ // following code, but unfortunately the behavior of logging
+ // is undefined in shutdown hooks.
+ // This is because the logging code installs a shutdown hook of its
+ // own. See Cleaner class inside {@link LogManager}.
+ service.shutdown();
+ service.awaitTermination(terminationTimeout, timeUnit);
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
+ }
+ }));
}
/**
@@ -140,8 +161,9 @@ public final class MoreExecutors {
* @return an unmodifiable version of the input which will not hang the JVM
*/
@Beta
- public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
- return new Application().getExitingExecutorService(executor);
+ public static ExecutorService getExitingExecutorService(
+ ThreadPoolExecutor executor) {
+ return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
}
/**
@@ -162,69 +184,7 @@ public final class MoreExecutors {
@Beta
public static ScheduledExecutorService getExitingScheduledExecutorService(
ScheduledThreadPoolExecutor executor) {
- return new Application().getExitingScheduledExecutorService(executor);
- }
-
- /** Represents the current application to register shutdown hooks. */
- @VisibleForTesting static class Application {
-
- final ExecutorService getExitingExecutorService(
- ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- useDaemonThreadFactory(executor);
- ExecutorService service = Executors.unconfigurableExecutorService(executor);
- addDelayedShutdownHook(service, terminationTimeout, timeUnit);
- return service;
- }
-
- final ScheduledExecutorService getExitingScheduledExecutorService(
- ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
- useDaemonThreadFactory(executor);
- ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
- addDelayedShutdownHook(service, terminationTimeout, timeUnit);
- return service;
- }
-
- final void addDelayedShutdownHook(
- final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
- checkNotNull(service);
- checkNotNull(timeUnit);
- addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
- @Override
- public void run() {
- try {
- // We'd like to log progress and failures that may arise in the
- // following code, but unfortunately the behavior of logging
- // is undefined in shutdown hooks.
- // This is because the logging code installs a shutdown hook of its
- // own. See Cleaner class inside {@link LogManager}.
- service.shutdown();
- service.awaitTermination(terminationTimeout, timeUnit);
- } catch (InterruptedException ignored) {
- // We're shutting down anyway, so just ignore.
- }
- }
- }));
- }
-
- final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
- return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
- }
-
- final ScheduledExecutorService getExitingScheduledExecutorService(
- ScheduledThreadPoolExecutor executor) {
- return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
- }
-
- @VisibleForTesting void addShutdownHook(Thread hook) {
- Runtime.getRuntime().addShutdownHook(hook);
- }
- }
-
- private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
- executor.setThreadFactory(new ThreadFactoryBuilder()
- .setDaemon(true)
- .setThreadFactory(executor.getThreadFactory())
- .build());
+ return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
}
/**
@@ -483,7 +443,6 @@ public final class MoreExecutors {
private static class ScheduledListeningDecorator
extends ListeningDecorator implements ListeningScheduledExecutorService {
- @SuppressWarnings("hiding")
final ScheduledExecutorService delegate;
ScheduledListeningDecorator(ScheduledExecutorService delegate) {
@@ -516,172 +475,4 @@ public final class MoreExecutors {
command, initialDelay, delay, unit);
}
}
-
- /*
- * This following method is a modified version of one found in
- * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
- * which contained the following notice:
- *
- * Written by Doug Lea with assistance from members of JCP JSR-166
- * Expert Group and released to the public domain, as explained at
- * http://creativecommons.org/publicdomain/zero/1.0/
- * Other contributors include Andrew Wright, Jeffrey Hayes,
- * Pat Fisher, Mike Judd.
- */
-
- /**
- * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
- * implementations.
- */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
- Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
- throws InterruptedException, ExecutionException, TimeoutException {
- checkNotNull(executorService);
- int ntasks = tasks.size();
- checkArgument(ntasks > 0);
- List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
- BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
-
- // For efficiency, especially in executors with limited
- // parallelism, check to see if previously submitted tasks are
- // done before submitting more of them. This interleaving
- // plus the exception mechanics account for messiness of main
- // loop.
-
- try {
- // Record exceptions so that if we fail to obtain any
- // result, we can throw the last exception we got.
- ExecutionException ee = null;
- long lastTime = timed ? System.nanoTime() : 0;
- Iterator<? extends Callable<T>> it = tasks.iterator();
-
- futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
- --ntasks;
- int active = 1;
-
- for (;;) {
- Future<T> f = futureQueue.poll();
- if (f == null) {
- if (ntasks > 0) {
- --ntasks;
- futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
- ++active;
- } else if (active == 0) {
- break;
- } else if (timed) {
- f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
- if (f == null) {
- throw new TimeoutException();
- }
- long now = System.nanoTime();
- nanos -= now - lastTime;
- lastTime = now;
- } else {
- f = futureQueue.take();
- }
- }
- if (f != null) {
- --active;
- try {
- return f.get();
- } catch (ExecutionException eex) {
- ee = eex;
- } catch (RuntimeException rex) {
- ee = new ExecutionException(rex);
- }
- }
- }
-
- if (ee == null) {
- ee = new ExecutionException(null);
- }
- throw ee;
- } finally {
- for (Future<T> f : futures) {
- f.cancel(true);
- }
- }
- }
-
- /**
- * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
- */
- private static <T> ListenableFuture<T> submitAndAddQueueListener(
- ListeningExecutorService executorService, Callable<T> task,
- final BlockingQueue<Future<T>> queue) {
- final ListenableFuture<T> future = executorService.submit(task);
- future.addListener(new Runnable() {
- @Override public void run() {
- queue.add(future);
- }
- }, MoreExecutors.sameThreadExecutor());
- return future;
- }
-
- /**
- * Returns a default thread factory used to create new threads.
- *
- * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}.
- * Otherwise, returns {@link Executors#defaultThreadFactory()}.
- *
- * @since 14.0
- */
- @Beta
- public static ThreadFactory platformThreadFactory() {
- if (!isAppEngine()) {
- return Executors.defaultThreadFactory();
- }
- try {
- return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
- .getMethod("currentRequestThreadFactory")
- .invoke(null);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
- } catch (InvocationTargetException e) {
- throw Throwables.propagate(e.getCause());
- }
- }
-
- private static boolean isAppEngine() {
- if (System.getProperty("com.google.appengine.runtime.environment") == null) {
- return false;
- }
- try {
- // If the current environment is null, we're not inside AppEngine.
- return Class.forName("com.google.apphosting.api.ApiProxy")
- .getMethod("getCurrentEnvironment")
- .invoke(null) != null;
- } catch (ClassNotFoundException e) {
- // If ApiProxy doesn't exist, we're not on AppEngine at all.
- return false;
- } catch (InvocationTargetException e) {
- // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
- return false;
- } catch (IllegalAccessException e) {
- // If the method isn't accessible, we're not on a supported version of AppEngine;
- return false;
- } catch (NoSuchMethodException e) {
- // If the method doesn't exist, we're not on a supported version of AppEngine;
- return false;
- }
- }
-
- /**
- * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name}
- * unless changing the name is forbidden by the security manager.
- */
- static Thread newThread(String name, Runnable runnable) {
- checkNotNull(name);
- checkNotNull(runnable);
- Thread result = platformThreadFactory().newThread(runnable);
- try {
- result.setName(name);
- } catch (SecurityException e) {
- // OK if we can't set the name in this environment.
- }
- return result;
- }
}