aboutsummaryrefslogtreecommitdiffstats
path: root/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java')
-rw-r--r--guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java240
1 files changed, 230 insertions, 10 deletions
diff --git a/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java b/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java
index 2aad6b3..3b0af7f 100644
--- a/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java
+++ b/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java
@@ -31,24 +31,34 @@ package com.google.common.util.concurrent;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.util.concurrent.MoreExecutors.invokeAnyImpl;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static com.google.common.util.concurrent.MoreExecutors.renamingDecorator;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.truth0.Truth.ASSERT;
+import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.testing.ClassSanityTester;
-import com.google.common.testing.FluentAsserts;
import com.google.common.util.concurrent.MoreExecutors.Application;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -56,15 +66,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-
/**
* Tests for MoreExecutors.
*
@@ -104,8 +113,8 @@ public class MoreExecutorsTest extends JSR166TestCase {
Future<?> future = executor.submit(incrementTask);
assertTrue(future.isDone());
assertEquals(1, threadLocalCount.get().intValue());
- } catch (Throwable Throwable) {
- throwableFromOtherThread.set(Throwable);
+ } catch (Throwable t) {
+ throwableFromOtherThread.set(t);
}
}
});
@@ -187,8 +196,8 @@ public class MoreExecutorsTest extends JSR166TestCase {
assertTrue(future.isDone());
assertTrue(executor.isShutdown());
assertTrue(executor.isTerminated());
- } catch (Throwable Throwable) {
- throwableFromOtherThread.set(Throwable);
+ } catch (Throwable t) {
+ throwableFromOtherThread.set(t);
}
}});
@@ -249,6 +258,14 @@ public class MoreExecutorsTest extends JSR166TestCase {
} catch (RejectedExecutionException expected) {}
}
+ public <T> void testListeningExecutorServiceInvokeAllJavadocCodeCompiles()
+ throws Exception {
+ ListeningExecutorService executor = MoreExecutors.sameThreadExecutor();
+ List<Callable<T>> tasks = ImmutableList.of();
+ @SuppressWarnings("unchecked") // guaranteed by invokeAll contract
+ List<ListenableFuture<T>> futures = (List) executor.invokeAll(tasks);
+ }
+
public void testListeningDecorator() throws Exception {
ListeningExecutorService service =
listeningDecorator(MoreExecutors.sameThreadExecutor());
@@ -258,10 +275,10 @@ public class MoreExecutorsTest extends JSR166TestCase {
List<Future<String>> results;
results = service.invokeAll(callables);
- FluentAsserts.assertThat(getOnlyElement(results)).isA(ListenableFutureTask.class);
+ ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class);
results = service.invokeAll(callables, 1, SECONDS);
- FluentAsserts.assertThat(getOnlyElement(results)).isA(ListenableFutureTask.class);
+ ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class);
/*
* TODO(cpovirk): move ForwardingTestCase somewhere common, and use it to
@@ -269,6 +286,132 @@ public class MoreExecutorsTest extends JSR166TestCase {
*/
}
+ public void testListeningDecorator_noWrapExecuteTask() {
+ ExecutorService delegate = mock(ExecutorService.class);
+ ListeningExecutorService service = listeningDecorator(delegate);
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {}
+ };
+ service.execute(task);
+ verify(delegate).execute(task);
+ }
+
+ public void testListeningDecorator_scheduleSuccess() throws Exception {
+ final CountDownLatch completed = new CountDownLatch(1);
+ ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1) {
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ completed.countDown();
+ }
+ };
+ ListeningScheduledExecutorService service = listeningDecorator(delegate);
+ ListenableFuture<?> future =
+ service.schedule(Callables.returning(null), 1, TimeUnit.MILLISECONDS);
+
+ /*
+ * Wait not just until the Future's value is set (as in future.get()) but
+ * also until ListeningScheduledExecutorService's wrapper task is done
+ * executing listeners, as detected by yielding control to afterExecute.
+ */
+ completed.await();
+ assertTrue(future.isDone());
+ assertListenerRunImmediately(future);
+ assertEquals(0, delegate.getQueue().size());
+ }
+
+ public void testListeningDecorator_scheduleFailure() throws Exception {
+ ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1);
+ ListeningScheduledExecutorService service = listeningDecorator(delegate);
+ RuntimeException ex = new RuntimeException();
+ ListenableFuture<?> future =
+ service.schedule(new ThrowingRunnable(0, ex), 1, TimeUnit.MILLISECONDS);
+ assertExecutionException(future, ex);
+ assertEquals(0, delegate.getQueue().size());
+ }
+
+ public void testListeningDecorator_schedulePeriodic() throws Exception {
+ ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1);
+ ListeningScheduledExecutorService service = listeningDecorator(delegate);
+ RuntimeException ex = new RuntimeException();
+
+ ListenableFuture<?> future;
+
+ ThrowingRunnable runnable = new ThrowingRunnable(5, ex);
+ future = service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.MILLISECONDS);
+ assertExecutionException(future, ex);
+ assertEquals(5, runnable.count);
+ assertEquals(0, delegate.getQueue().size());
+
+ runnable = new ThrowingRunnable(5, ex);
+ future = service.scheduleWithFixedDelay(runnable, 1, 1, TimeUnit.MILLISECONDS);
+ assertExecutionException(future, ex);
+ assertEquals(5, runnable.count);
+ assertEquals(0, delegate.getQueue().size());
+ }
+
+ public void testListeningDecorator_cancelled() throws Exception {
+ ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1);
+ BlockingQueue<?> delegateQueue = delegate.getQueue();
+ ListeningScheduledExecutorService service = listeningDecorator(delegate);
+ ListenableFuture<?> future;
+ ScheduledFuture<?> delegateFuture;
+
+ Runnable runnable = new Runnable() {
+ @Override public void run() {}
+ };
+
+ future = service.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
+ future.cancel(true);
+ assertTrue(future.isCancelled());
+ delegateFuture = (ScheduledFuture<?>) delegateQueue.element();
+ assertTrue(delegateFuture.isCancelled());
+
+ delegateQueue.clear();
+
+ future = service.scheduleAtFixedRate(runnable, 5 * 60, 5 * 60, TimeUnit.SECONDS);
+ future.cancel(true);
+ assertTrue(future.isCancelled());
+ delegateFuture = (ScheduledFuture<?>) delegateQueue.element();
+ assertTrue(delegateFuture.isCancelled());
+
+ delegateQueue.clear();
+
+ future = service.scheduleWithFixedDelay(runnable, 5 * 60, 5 * 60, TimeUnit.SECONDS);
+ future.cancel(true);
+ assertTrue(future.isCancelled());
+ delegateFuture = (ScheduledFuture<?>) delegateQueue.element();
+ assertTrue(delegateFuture.isCancelled());
+ }
+
+ private static final class ThrowingRunnable implements Runnable {
+ final int throwAfterCount;
+ final RuntimeException thrown;
+ int count;
+
+ ThrowingRunnable(int throwAfterCount, RuntimeException thrown) {
+ this.throwAfterCount = throwAfterCount;
+ this.thrown = thrown;
+ }
+
+ @Override
+ public void run() {
+ if (++count >= throwAfterCount) {
+ throw thrown;
+ }
+ }
+ }
+
+ private static void assertExecutionException(Future<?> future, Exception expectedCause)
+ throws Exception {
+ try {
+ future.get();
+ fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ assertSame(expectedCause, e.getCause());
+ }
+ }
+
/**
* invokeAny(null) throws NPE
*/
@@ -447,8 +590,20 @@ public class MoreExecutorsTest extends JSR166TestCase {
assertEquals(factory.getClass(), Executors.defaultThreadFactory().getClass());
}
+ public void testThreadRenaming() {
+ Executor renamingExecutor = renamingDecorator(sameThreadExecutor(),
+ Suppliers.ofInstance("FooBar"));
+ String oldName = Thread.currentThread().getName();
+ renamingExecutor.execute(new Runnable() {
+ @Override public void run() {
+ assertEquals("FooBar", Thread.currentThread().getName());
+ }});
+ assertEquals(oldName, Thread.currentThread().getName());
+ }
+
public void testExecutors_nullCheck() throws Exception {
new ClassSanityTester()
+ .setDefault(RateLimiter.class, RateLimiter.create(1.0))
.forAllPublicStaticMethods(MoreExecutors.class)
.thatReturn(Executor.class)
.testNulls();
@@ -470,4 +625,69 @@ public class MoreExecutorsTest extends JSR166TestCase {
}
}
}
+
+ /* Half of a 1-second timeout in nanoseconds */
+ private static final long HALF_SECOND_NANOS = NANOSECONDS.convert(1L, SECONDS) / 2;
+
+ public void testShutdownAndAwaitTermination_immediateShutdown() throws Exception {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ assertTrue(shutdownAndAwaitTermination(service, 1L, SECONDS));
+ assertTrue(service.isTerminated());
+ }
+
+ public void testShutdownAndAwaitTermination_immediateShutdownInternal() throws Exception {
+ ExecutorService service = mock(ExecutorService.class);
+ when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS)).thenReturn(true);
+ when(service.isTerminated()).thenReturn(true);
+ assertTrue(shutdownAndAwaitTermination(service, 1L, SECONDS));
+ verify(service).shutdown();
+ verify(service).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS);
+ }
+
+ public void testShutdownAndAwaitTermination_forcedShutDownInternal() throws Exception {
+ ExecutorService service = mock(ExecutorService.class);
+ when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS))
+ .thenReturn(false).thenReturn(true);
+ when(service.isTerminated()).thenReturn(true);
+ assertTrue(shutdownAndAwaitTermination(service, 1L, SECONDS));
+ verify(service).shutdown();
+ verify(service, times(2)).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS);
+ verify(service).shutdownNow();
+ }
+
+ public void testShutdownAndAwaitTermination_nonTerminationInternal() throws Exception {
+ ExecutorService service = mock(ExecutorService.class);
+ when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS))
+ .thenReturn(false).thenReturn(false);
+ assertFalse(shutdownAndAwaitTermination(service, 1L, SECONDS));
+ verify(service).shutdown();
+ verify(service, times(2)).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS);
+ verify(service).shutdownNow();
+ }
+
+ public void testShutdownAndAwaitTermination_interruptedInternal() throws Exception {
+ final ExecutorService service = mock(ExecutorService.class);
+ when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS))
+ .thenThrow(new InterruptedException());
+
+ final AtomicBoolean terminated = new AtomicBoolean();
+ // we need to keep this in a flag because t.isInterrupted() returns false after t.join()
+ final AtomicBoolean interrupted = new AtomicBoolean();
+ // we need to use another thread because it will be interrupted and thus using
+ // the current one, owned by JUnit, would make the test fail
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ terminated.set(shutdownAndAwaitTermination(service, 1L, SECONDS));
+ interrupted.set(Thread.currentThread().isInterrupted());
+ }
+ });
+ thread.start();
+ thread.join();
+ verify(service).shutdown();
+ verify(service).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS);
+ verify(service).shutdownNow();
+ assertTrue(interrupted.get());
+ assertFalse(terminated.get());
+ }
}