diff options
Diffstat (limited to 'guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java')
-rw-r--r-- | guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java | 240 |
1 files changed, 230 insertions, 10 deletions
diff --git a/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java b/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java index 2aad6b3..3b0af7f 100644 --- a/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java +++ b/guava-tests/test/com/google/common/util/concurrent/MoreExecutorsTest.java @@ -31,24 +31,34 @@ package com.google.common.util.concurrent; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.util.concurrent.MoreExecutors.invokeAnyImpl; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static com.google.common.util.concurrent.MoreExecutors.renamingDecorator; import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; +import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.truth0.Truth.ASSERT; +import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.testing.ClassSanityTester; -import com.google.common.testing.FluentAsserts; import com.google.common.util.concurrent.MoreExecutors.Application; +import org.mockito.InOrder; +import org.mockito.Mockito; + import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -56,15 +66,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.mockito.InOrder; -import org.mockito.Mockito; - /** * Tests for MoreExecutors. * @@ -104,8 +113,8 @@ public class MoreExecutorsTest extends JSR166TestCase { Future<?> future = executor.submit(incrementTask); assertTrue(future.isDone()); assertEquals(1, threadLocalCount.get().intValue()); - } catch (Throwable Throwable) { - throwableFromOtherThread.set(Throwable); + } catch (Throwable t) { + throwableFromOtherThread.set(t); } } }); @@ -187,8 +196,8 @@ public class MoreExecutorsTest extends JSR166TestCase { assertTrue(future.isDone()); assertTrue(executor.isShutdown()); assertTrue(executor.isTerminated()); - } catch (Throwable Throwable) { - throwableFromOtherThread.set(Throwable); + } catch (Throwable t) { + throwableFromOtherThread.set(t); } }}); @@ -249,6 +258,14 @@ public class MoreExecutorsTest extends JSR166TestCase { } catch (RejectedExecutionException expected) {} } + public <T> void testListeningExecutorServiceInvokeAllJavadocCodeCompiles() + throws Exception { + ListeningExecutorService executor = MoreExecutors.sameThreadExecutor(); + List<Callable<T>> tasks = ImmutableList.of(); + @SuppressWarnings("unchecked") // guaranteed by invokeAll contract + List<ListenableFuture<T>> futures = (List) executor.invokeAll(tasks); + } + public void testListeningDecorator() throws Exception { ListeningExecutorService service = listeningDecorator(MoreExecutors.sameThreadExecutor()); @@ -258,10 +275,10 @@ public class MoreExecutorsTest extends JSR166TestCase { List<Future<String>> results; results = service.invokeAll(callables); - FluentAsserts.assertThat(getOnlyElement(results)).isA(ListenableFutureTask.class); + ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class); results = service.invokeAll(callables, 1, SECONDS); - FluentAsserts.assertThat(getOnlyElement(results)).isA(ListenableFutureTask.class); + ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class); /* * TODO(cpovirk): move ForwardingTestCase somewhere common, and use it to @@ -269,6 +286,132 @@ public class MoreExecutorsTest extends JSR166TestCase { */ } + public void testListeningDecorator_noWrapExecuteTask() { + ExecutorService delegate = mock(ExecutorService.class); + ListeningExecutorService service = listeningDecorator(delegate); + Runnable task = new Runnable() { + @Override + public void run() {} + }; + service.execute(task); + verify(delegate).execute(task); + } + + public void testListeningDecorator_scheduleSuccess() throws Exception { + final CountDownLatch completed = new CountDownLatch(1); + ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1) { + @Override + protected void afterExecute(Runnable r, Throwable t) { + completed.countDown(); + } + }; + ListeningScheduledExecutorService service = listeningDecorator(delegate); + ListenableFuture<?> future = + service.schedule(Callables.returning(null), 1, TimeUnit.MILLISECONDS); + + /* + * Wait not just until the Future's value is set (as in future.get()) but + * also until ListeningScheduledExecutorService's wrapper task is done + * executing listeners, as detected by yielding control to afterExecute. + */ + completed.await(); + assertTrue(future.isDone()); + assertListenerRunImmediately(future); + assertEquals(0, delegate.getQueue().size()); + } + + public void testListeningDecorator_scheduleFailure() throws Exception { + ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1); + ListeningScheduledExecutorService service = listeningDecorator(delegate); + RuntimeException ex = new RuntimeException(); + ListenableFuture<?> future = + service.schedule(new ThrowingRunnable(0, ex), 1, TimeUnit.MILLISECONDS); + assertExecutionException(future, ex); + assertEquals(0, delegate.getQueue().size()); + } + + public void testListeningDecorator_schedulePeriodic() throws Exception { + ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1); + ListeningScheduledExecutorService service = listeningDecorator(delegate); + RuntimeException ex = new RuntimeException(); + + ListenableFuture<?> future; + + ThrowingRunnable runnable = new ThrowingRunnable(5, ex); + future = service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.MILLISECONDS); + assertExecutionException(future, ex); + assertEquals(5, runnable.count); + assertEquals(0, delegate.getQueue().size()); + + runnable = new ThrowingRunnable(5, ex); + future = service.scheduleWithFixedDelay(runnable, 1, 1, TimeUnit.MILLISECONDS); + assertExecutionException(future, ex); + assertEquals(5, runnable.count); + assertEquals(0, delegate.getQueue().size()); + } + + public void testListeningDecorator_cancelled() throws Exception { + ScheduledThreadPoolExecutor delegate = new ScheduledThreadPoolExecutor(1); + BlockingQueue<?> delegateQueue = delegate.getQueue(); + ListeningScheduledExecutorService service = listeningDecorator(delegate); + ListenableFuture<?> future; + ScheduledFuture<?> delegateFuture; + + Runnable runnable = new Runnable() { + @Override public void run() {} + }; + + future = service.schedule(runnable, 5 * 60, TimeUnit.SECONDS); + future.cancel(true); + assertTrue(future.isCancelled()); + delegateFuture = (ScheduledFuture<?>) delegateQueue.element(); + assertTrue(delegateFuture.isCancelled()); + + delegateQueue.clear(); + + future = service.scheduleAtFixedRate(runnable, 5 * 60, 5 * 60, TimeUnit.SECONDS); + future.cancel(true); + assertTrue(future.isCancelled()); + delegateFuture = (ScheduledFuture<?>) delegateQueue.element(); + assertTrue(delegateFuture.isCancelled()); + + delegateQueue.clear(); + + future = service.scheduleWithFixedDelay(runnable, 5 * 60, 5 * 60, TimeUnit.SECONDS); + future.cancel(true); + assertTrue(future.isCancelled()); + delegateFuture = (ScheduledFuture<?>) delegateQueue.element(); + assertTrue(delegateFuture.isCancelled()); + } + + private static final class ThrowingRunnable implements Runnable { + final int throwAfterCount; + final RuntimeException thrown; + int count; + + ThrowingRunnable(int throwAfterCount, RuntimeException thrown) { + this.throwAfterCount = throwAfterCount; + this.thrown = thrown; + } + + @Override + public void run() { + if (++count >= throwAfterCount) { + throw thrown; + } + } + } + + private static void assertExecutionException(Future<?> future, Exception expectedCause) + throws Exception { + try { + future.get(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertSame(expectedCause, e.getCause()); + } + } + /** * invokeAny(null) throws NPE */ @@ -447,8 +590,20 @@ public class MoreExecutorsTest extends JSR166TestCase { assertEquals(factory.getClass(), Executors.defaultThreadFactory().getClass()); } + public void testThreadRenaming() { + Executor renamingExecutor = renamingDecorator(sameThreadExecutor(), + Suppliers.ofInstance("FooBar")); + String oldName = Thread.currentThread().getName(); + renamingExecutor.execute(new Runnable() { + @Override public void run() { + assertEquals("FooBar", Thread.currentThread().getName()); + }}); + assertEquals(oldName, Thread.currentThread().getName()); + } + public void testExecutors_nullCheck() throws Exception { new ClassSanityTester() + .setDefault(RateLimiter.class, RateLimiter.create(1.0)) .forAllPublicStaticMethods(MoreExecutors.class) .thatReturn(Executor.class) .testNulls(); @@ -470,4 +625,69 @@ public class MoreExecutorsTest extends JSR166TestCase { } } } + + /* Half of a 1-second timeout in nanoseconds */ + private static final long HALF_SECOND_NANOS = NANOSECONDS.convert(1L, SECONDS) / 2; + + public void testShutdownAndAwaitTermination_immediateShutdown() throws Exception { + ExecutorService service = Executors.newSingleThreadExecutor(); + assertTrue(shutdownAndAwaitTermination(service, 1L, SECONDS)); + assertTrue(service.isTerminated()); + } + + public void testShutdownAndAwaitTermination_immediateShutdownInternal() throws Exception { + ExecutorService service = mock(ExecutorService.class); + when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS)).thenReturn(true); + when(service.isTerminated()).thenReturn(true); + assertTrue(shutdownAndAwaitTermination(service, 1L, SECONDS)); + verify(service).shutdown(); + verify(service).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS); + } + + public void testShutdownAndAwaitTermination_forcedShutDownInternal() throws Exception { + ExecutorService service = mock(ExecutorService.class); + when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS)) + .thenReturn(false).thenReturn(true); + when(service.isTerminated()).thenReturn(true); + assertTrue(shutdownAndAwaitTermination(service, 1L, SECONDS)); + verify(service).shutdown(); + verify(service, times(2)).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS); + verify(service).shutdownNow(); + } + + public void testShutdownAndAwaitTermination_nonTerminationInternal() throws Exception { + ExecutorService service = mock(ExecutorService.class); + when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS)) + .thenReturn(false).thenReturn(false); + assertFalse(shutdownAndAwaitTermination(service, 1L, SECONDS)); + verify(service).shutdown(); + verify(service, times(2)).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS); + verify(service).shutdownNow(); + } + + public void testShutdownAndAwaitTermination_interruptedInternal() throws Exception { + final ExecutorService service = mock(ExecutorService.class); + when(service.awaitTermination(HALF_SECOND_NANOS, NANOSECONDS)) + .thenThrow(new InterruptedException()); + + final AtomicBoolean terminated = new AtomicBoolean(); + // we need to keep this in a flag because t.isInterrupted() returns false after t.join() + final AtomicBoolean interrupted = new AtomicBoolean(); + // we need to use another thread because it will be interrupted and thus using + // the current one, owned by JUnit, would make the test fail + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + terminated.set(shutdownAndAwaitTermination(service, 1L, SECONDS)); + interrupted.set(Thread.currentThread().isInterrupted()); + } + }); + thread.start(); + thread.join(); + verify(service).shutdown(); + verify(service).awaitTermination(HALF_SECOND_NANOS, NANOSECONDS); + verify(service).shutdownNow(); + assertTrue(interrupted.get()); + assertFalse(terminated.get()); + } } |