diff options
author | Tyson Henning <yorick@google.com> | 2019-07-16 13:19:17 -0700 |
---|---|---|
committer | Roman Elizarov <elizarov@gmail.com> | 2019-08-29 17:52:40 +0300 |
commit | 57cc36454da6851ecd2a00abe7cfd6ee6e06928f (patch) | |
tree | 97e8b2f59a2615a76889a5f6ac96f534e23d3c0c | |
parent | 95d88ab58b0008b7ed5b8c580902c4719da3e864 (diff) | |
download | platform_external_kotlinx.coroutines-57cc36454da6851ecd2a00abe7cfd6ee6e06928f.tar.gz platform_external_kotlinx.coroutines-57cc36454da6851ecd2a00abe7cfd6ee6e06928f.tar.bz2 platform_external_kotlinx.coroutines-57cc36454da6851ecd2a00abe7cfd6ee6e06928f.zip |
Repaired some of ListenableFuture.kt's cancellation corner cases.
This fixes:
- Cancellation without an untrapped CancellationException propagating
through a Callback; isCancelled() is the correct way to check for
cancellation
- Bidirectional propagation of cancellation through
`asListenableFuture()`
- The cause getting lost in the `asListenableFuture()` future when
cancelling its `Deferred` parent with a cause
This also:
- Extensively documents the package and the contracts created by the
promise-creating extension methods and `future()`
- Uses `getUninterruptibly()` for speed
- Uses `AbstractFuture` to make as certain as possible that
`Future.cancel()` will return `true` at most once
- Should clear up rare spooky race conditions around
cancellation/interruption in hybrid Coroutines/Guava Futures
codebases
There are probably a few more interesting corner cases hiding in here,
but this should be a good start improving the correctness of `.guava`'s
adapters.
This is a squash commit of kotlin/pr/1347, rebased on develop:
- Incorporated first-round feedback.
- Merged CancellationToCoroutine into ListenableFutureCoroutine to save an
allocation.
- Documented and tested for null completion of asDeferred()'s parent
Future.
- Renamed a cancellation test case for clarity of purpose.
- Split asDeferred() documentation between KDoc/details
- Implemented InternalFutures faster-fast path. Documented.
-rw-r--r-- | integration/kotlinx-coroutines-guava/src/ListenableFuture.kt | 486 | ||||
-rw-r--r-- | integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt | 202 |
2 files changed, 596 insertions, 92 deletions
diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index a924bb4a..de47c760 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -5,31 +5,40 @@ package kotlinx.coroutines.guava import com.google.common.util.concurrent.* -import kotlinx.coroutines.* +import com.google.common.util.concurrent.internal.InternalFutureFailureAccess +import com.google.common.util.concurrent.internal.InternalFutures import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.coroutines.* +import kotlinx.coroutines.* +import java.util.concurrent.CancellationException /** - * Starts new coroutine and returns its results an an implementation of [ListenableFuture]. - * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed. + * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result. + * + * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws + * [IllegalArgumentException], because Futures don't have a way to start lazily. * - * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument. - * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden - * with corresponding [coroutineContext] element. + * The created coroutine is cancelled when the resulting future completes successfully, fails, or + * is cancelled. * - * By default, the coroutine is immediately scheduled for execution. - * Other options can be specified via `start` parameter. See [CoroutineStart] for details. - * A value of [CoroutineStart.LAZY] is not supported - * (since `ListenableFuture` framework does not provide the corresponding capability) and - * produces [IllegalArgumentException]. + * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be + * added/overlaid by passing [context]. * - * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. + * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor] + * member, [Dispatchers.Default] is used. * - * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. + * The parent job is inherited from this [CoroutineScope], and can be overridden by passing + * a [Job] in [context]. + * + * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging + * facilities. + * + * Note that the error and cancellation semantics of [future] are _subtly different_ than + * [asListenableFuture]'s. See [ListenableFutureCoroutine] for details. + * + * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context. * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. - * @param block the coroutine code. + * @param block the code to execute. */ public fun <T> CoroutineScope.future( context: CoroutineContext = EmptyCoroutineContext, @@ -38,81 +47,98 @@ public fun <T> CoroutineScope.future( ): ListenableFuture<T> { require(!start.isLazy) { "$start start is not supported" } val newContext = newCoroutineContext(context) + // TODO: It'd be nice not to leak this SettableFuture reference, which is easily blind-cast. val future = SettableFuture.create<T>() val coroutine = ListenableFutureCoroutine(newContext, future) - Futures.addCallback(future, coroutine, MoreExecutors.directExecutor()) + future.addListener( + coroutine, + MoreExecutors.directExecutor()) coroutine.start(start, coroutine, block) return future } -private class ListenableFutureCoroutine<T>( - context: CoroutineContext, - private val future: SettableFuture<T> -) : AbstractCoroutine<T>(context), FutureCallback<T> { - /* - * We register coroutine as callback to the future this coroutine completes. - * But when future is cancelled externally, we'd like to cancel coroutine, - * so we register on failure handler for this purpose - */ - override fun onSuccess(result: T?) { - // Do nothing - } - - override fun onFailure(t: Throwable) { - if (t is CancellationException) { - cancel() - } - } - - override fun onCompleted(value: T) { - future.set(value) - } - - override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!future.setException(cause) && !handled) { - // prevents loss of exception that was not handled by parent & could not be set to SettableFuture - handleCoroutineException(context, cause) - } - } -} - /** - * Converts this deferred value to the instance of [ListenableFuture]. - * The deferred value is cancelled when the resulting future is cancelled or otherwise completed. + * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture]. + * + * Completion is non-atomic between the two promises. + * + * Cancellation is propagated bidirectionally. + * + * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to + * complete the returned `Deferred` with the same value or exception. This will succeed, barring a + * race with cancellation of the `Deferred`. + * + * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel], + * it will cancel the returned `Deferred`. + * + * When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the + * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the + * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred` + * will complete with a different outcome than `this` `ListenableFuture`. */ -public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> = DeferredListenableFuture<T>(this) - -private class DeferredListenableFuture<T>( - private val deferred: Deferred<T> -) : AbstractFuture<T>() { - init { - deferred.invokeOnCompletion { - try { - set(deferred.getCompleted()) - } catch (t: Throwable) { - setException(t) +public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> { + /* This method creates very specific behaviour as it entangles the `Deferred` and + * `ListenableFuture`. This behaviour is the best discovered compromise between the possible + * states and interface contracts of a `Future` and the states of a `Deferred`. The specific + * behaviour is described here. + * + * When `this` `ListenableFuture` is successfully cancelled - meaning + * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned + * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the + * `Deferred` will always be put into its "cancelling" state and (barring uncooperative + * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully + * cancelled. + * + * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously + * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though + * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled + * terminal state. + * + * The returned `Deferred` may receive and suppress the `true` return value from + * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it. + * This is unavoidable, so make sure no idempotent cancellation work is performed by a + * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if + * cancellation was from the `Deferred` representation of the task. + * + * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation + * semantics. See `Job` for a description of coroutine cancellation semantics. + */ + // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an + // Exception by using the same instance the Future created. + if (this is InternalFutureFailureAccess) { + val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this) + if (t != null) { + return CompletableDeferred<T>().also { + it.completeExceptionally(t) } } } - override fun interruptTask() { deferred.cancel() } -} -/** - * Converts this listenable future to an instance of [Deferred]. - * It is cancelled when the resulting deferred is cancelled. - */ -public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> { - // Fast path if already completed + // Second, try the fast path for a completed Future. The Future is known to be done, so get() + // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of + // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to + // handle interruption. if (isDone) { return try { - @Suppress("UNCHECKED_CAST") - CompletableDeferred(get() as T) - } catch (e: Throwable) { - // unwrap original cause from ExecutionException - val original = (e as? ExecutionException)?.cause ?: e - CompletableDeferred<T>().also { it.completeExceptionally(original) } + val value = Uninterruptibles.getUninterruptibly(this) + if (value == null) { + CompletableDeferred<T>().also { + it.completeExceptionally(KotlinNullPointerException()) + } + } else { + CompletableDeferred(value) + } + } catch (e: CancellationException) { + CompletableDeferred<T>().also { it.cancel(e) } + } catch (e: ExecutionException) { + // ExecutionException is the only kind of exception that can be thrown from a gotten + // Future. Anything else showing up here indicates a very fundamental bug in a + // Future implementation. + CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) } } } + + // Finally, if this isn't done yet, attach a Listener that will complete the Deferred. val deferred = CompletableDeferred<T>() Futures.addCallback(this, object : FutureCallback<T> { override fun onSuccess(result: T?) { @@ -124,41 +150,319 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> { } }, MoreExecutors.directExecutor()) - deferred.invokeOnCompletion { cancel(false) } + // ... And cancel the Future when the deferred completes. Since the return type of this method + // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this + // completion handler runs before the Future is completed, the Deferred must have been + // cancelled and should propagate its cancellation. If it runs after the Future is completed, + // this is a no-op. + deferred.invokeOnCompletion { + cancel(false) + } return deferred } /** - * Awaits for completion of the future without blocking a thread. + * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar. + * + * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot + * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught + * [Exception]. + * + * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing + * state - a serious fundamental bug. + */ +private fun ExecutionException.nonNullCause(): Throwable { + return this.cause!! +} + +/** + * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred]. + * + * Completion is non-atomic between the two promises. + * + * When either promise successfully completes, it will attempt to synchronously complete its + * counterpart with the same value. This will succeed barring a race with cancellation. + * + * When either promise completes with an Exception, it will attempt to synchronously complete its + * counterpart with the same Exception. This will succeed barring a race with cancellation. + * + * Cancellation is propagated bidirectionally. + * + * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true - + * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel + * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a + * non-cancelled terminal state. + * + * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it + * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously + * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so + * the returned `Future` will always _eventually_ reach its cancelled state when either promise is + * successfully cancelled, for their different meanings of "successfully cancelled". + * + * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation + * semantics. See [Job] for a description of coroutine cancellation semantics. See + * [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and + * corner cases of this method. + */ +public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> { + val outerFuture = OuterFuture<T>(this) + outerFuture.afterInit() + return outerFuture +} + +/** + * Awaits completion of `this` [ListenableFuture] without blocking a thread. + * + * This suspend function is cancellable. * - * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException]. * - * This method is intended to be used with one-shot futures, so on coroutine cancellation future is cancelled as well. - * If cancelling given future is undesired, `future.asDeferred().await()` should be used instead. + * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well. + * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or + * [kotlinx.coroutines.NonCancellable]. + * */ public suspend fun <T> ListenableFuture<T>.await(): T { try { - if (isDone) return get() as T + if (isDone) return Uninterruptibles.getUninterruptibly(this) } catch (e: ExecutionException) { - throw e.cause ?: e // unwrap original cause from ExecutionException + // ExecutionException is the only kind of exception that can be thrown from a gotten + // Future, other than CancellationException. Cancellation is propagated upward so that + // the coroutine running this suspend function may process it. + // Any other Exception showing up here indicates a very fundamental bug in a + // Future implementation. + throw e.nonNullCause() } return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> - val callback = ContinuationCallback(cont) - Futures.addCallback(this, callback, MoreExecutors.directExecutor()) + addListener( + ToContinuation(this, cont), + MoreExecutors.directExecutor()) cont.invokeOnCancellation { cancel(false) - callback.cont = null // clear the reference to continuation from the future's callback } } } -private class ContinuationCallback<T>( - @Volatile @JvmField var cont: Continuation<T>? -) : FutureCallback<T> { - @Suppress("UNCHECKED_CAST") - override fun onSuccess(result: T?) { cont?.resume(result as T) } - override fun onFailure(t: Throwable) { cont?.resumeWithException(t) } +/** + * Propagates the outcome of [futureToObserve] to [continuation] on completion. + * + * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes + * and fails, the cause of the Future will be propagated without a wrapping + * [ExecutionException] when thrown. + */ +private class ToContinuation<T>( + val futureToObserve: ListenableFuture<T>, + val continuation: CancellableContinuation<T> +): Runnable { + override fun run() { + if (futureToObserve.isCancelled) { + continuation.cancel() + } else { + try { + continuation.resumeWith( + Result.success(Uninterruptibles.getUninterruptibly(futureToObserve))) + } catch (e: ExecutionException) { + // ExecutionException is the only kind of exception that can be thrown from a gotten + // Future. Anything else showing up here indicates a very fundamental bug in a + // Future implementation. + continuation.resumeWithException(e.nonNullCause()) + } + } + } +} + +/** + * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to + * completion. + * + * The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback. + * See [run] for details. Both types are implemented by this object to save an allocation. + */ +private class ListenableFutureCoroutine<T>( + context: CoroutineContext, + private val future: SettableFuture<T> +) : AbstractCoroutine<T>(context), Runnable { + + /** + * When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if + * [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if + * and only if its `isCancelled()` method returns true. + * + * Any error that occurs after successfully cancelling a [ListenableFuture] + * created by submitting the returned object as a [Runnable] to an `Executor` will be passed + * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit + * it to return an error after it is successfully cancelled. + * + * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully + * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to + * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the + * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that + * the [Deferred] pointing to the task will be used to observe any error outcome occurring after + * cancellation. + * + * This may be counterintuitive, but it maintains the error and cancellation contracts of both + * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point + * to the same running task. + */ + override fun run() { + if (future.isCancelled) { + cancel() + } + } + + override fun onCompleted(value: T) { + future.set(value) + } + + // TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation? + override fun onCancelled(cause: Throwable, handled: Boolean) { + if (!future.setException(cause) && !handled) { + // prevents loss of exception that was not handled by parent & could not be set to SettableFuture + handleCoroutineException(context, cause) + } + } +} + +/** + * A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with + * it. + * + * This setup allows the returned [ListenableFuture] to maintain the following properties: + * + * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone] + * and [isCancelled] methods + * - Cancellation propagation both to and from [Deferred] + * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined + * with different concrete implementations of [ListenableFuture] + * - Fully correct cancellation and listener happens-after obeying [Future] and + * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve. + * The best way to be correct, especially given the fun corner cases from + * [AsyncFuture.setAsync], is to just use an [AsyncFuture]. + * - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture] + * around its input [deferred] as a state engine to establish happens-after-completion. This + * could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the + * cost of the implementation's readability. + */ +private class OuterFuture<T>(private val deferred: Deferred<T>): ListenableFuture<T> { + val innerFuture = DeferredListenableFuture(deferred) + + // Adding the listener after initialization resolves partial construction hairpin problem. + // + // This invokeOnCompletion completes the innerFuture as `deferred` does. The innerFuture may + // have completed earlier if it got cancelled! See DeferredListenableFuture. + fun afterInit() { + deferred.invokeOnCompletion { + innerFuture.complete() + } + } + + /** + * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to + * [Job.isCancelled]. + * + * When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate + * [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class. + * + * See [DeferredListenableFuture.cancel]. + */ + override fun isCancelled(): Boolean { + // This expression ensures that isCancelled() will *never* return true when isDone() returns false. + // In the case that the deferred has completed with cancellation, completing `this`, its + // reaching the "cancelled" state with a cause of CancellationException is treated as the + // same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and + // this Future hasn't itself been successfully cancelled, the Future will return + // isCancelled() == false. This is the only discovered way to reconcile the two different + // cancellation contracts. + return isDone + && (innerFuture.isCancelled + || deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException) + } + + /** + * Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that + * Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes + * happens-after ordering for completion of the [Deferred] input to [OuterFuture]. + * + * `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after + * contract of [Future] to be correctly followed. If this method were to directly use + * _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this + * [ListenableFuture] is created from might be in an incomplete state when used by `get()`. + */ + override fun get(): T { + return getInternal(innerFuture.get()) + } + + /** See [get()]. */ + override fun get(timeout: Long, unit: TimeUnit): T { + return getInternal(innerFuture.get(timeout, unit)) + } + + /** See [get()]. */ + private fun getInternal(deferred: Deferred<T>): T { + if (deferred.isCancelled) { + val exception = deferred.getCompletionExceptionOrNull() + if (exception is kotlinx.coroutines.CancellationException) { + throw exception + } else { + throw ExecutionException(exception) + } + } else { + return deferred.getCompleted() + } + } + + override fun addListener(listener: Runnable, executor: Executor) { + innerFuture.addListener(listener, executor) + } + + override fun isDone(): Boolean { + return innerFuture.isDone + } + + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + return innerFuture.cancel(mayInterruptIfRunning) + } +} + +/** + * Holds a delegate deferred, and serves as a state machine for [Future] cancellation. + * + * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and + * cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to + * _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when + * returned. + */ +private class DeferredListenableFuture<T>( + private val deferred: Deferred<T> +) : AbstractFuture<Deferred<T>>() { + + fun complete() { + set(deferred) + } + + /** + * Tries to cancel the task. This is fundamentally racy. + * + * For any given call to `cancel()`, if [deferred] is already completed, the call will complete + * this Future with it, and fail to cancel. Otherwise, the + * call to `cancel()` will try to cancel this Future: if and only if cancellation of this + * succeeds, [deferred] will have its [Deferred.cancel] called. + * + * This arrangement means that [deferred] _might not successfully cancel_, if the race resolves + * in a particular way. [deferred] may also be in its "cancelling" state while this + * ListenableFuture is complete and cancelled. + * + * [OuterFuture] collaborates with this class to present a more cohesive picture and ensure + * that certain combinations of cancelled/cancelling states can't be observed. + */ + override fun cancel(mayInterruptIfRunning: Boolean): Boolean { + return if (super.cancel(mayInterruptIfRunning)) { + deferred.cancel() + true + } else { + false + } + } } diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index cf82318a..1f96dfdb 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -6,12 +6,12 @@ package kotlinx.coroutines.guava import com.google.common.util.concurrent.* import kotlinx.coroutines.* -import kotlinx.coroutines.CancellationException import org.hamcrest.core.* import org.junit.* import org.junit.Assert.* import org.junit.Test import java.util.concurrent.* +import java.util.concurrent.CancellationException class ListenableFutureTest : TestBase() { @Before @@ -190,6 +190,122 @@ class ListenableFutureTest : TestBase() { } @Test + fun testFutureAwaitCancellationPropagatingToDeferred() = runTest { + + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + expect(1) + yield() + future.cancel(/*mayInterruptIfRunning=*/true) + expect(3) + latch.countDown() + deferred.join() + assertTrue(future.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith<CancellationException> { future.get() } + finish(4) + } + + @Test + fun testFutureAwaitCancellationPropagatingToDeferredNoInterruption() = runTest { + + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + expect(1) + yield() + future.cancel(/*mayInterruptIfRunning=*/false) + expect(3) + latch.countDown() + deferred.join() + assertTrue(future.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith<CancellationException> { future.get() } + finish(4) + } + + @Test + fun testAsListenableFutureCancellationPropagatingToDeferred() = runTest { + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + val asListenableFuture = deferred.asListenableFuture() + expect(1) + yield() + asListenableFuture.cancel(/*mayInterruptIfRunning=*/true) + expect(3) + latch.countDown() + deferred.join() + assertTrue(future.isCancelled) + assertTrue(deferred.isCancelled) + assertTrue(asListenableFuture.isCancelled) + assertFailsWith<CancellationException> { future.get() } + finish(4) + } + + @Test + fun testAsListenableFutureCancellationPropagatingToDeferredNoInterruption() = runTest { + val latch = CountDownLatch(1) + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { latch.await(); 42 }) + val deferred = async { + expect(2) + future.await() + } + val asListenableFuture = deferred.asListenableFuture() + expect(1) + yield() + asListenableFuture.cancel(/*mayInterruptIfRunning=*/false) + expect(3) + latch.countDown() + deferred.join() + assertFailsWith<CancellationException> { asListenableFuture.get() } + assertTrue(future.isCancelled) + assertTrue(asListenableFuture.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith<CancellationException> { future.get() } + finish(4) + } + + @Test + fun testAsListenableFutureCancellationThroughSetFuture() = runTest { + val latch = CountDownLatch(1) + val future = SettableFuture.create<Void>() + val deferred = async { + expect(2) + future.await() + } + val asListenableFuture = deferred.asListenableFuture() + expect(1) + yield() + future.setFuture(Futures.immediateCancelledFuture()) + expect(3) + latch.countDown() + deferred.join() + assertFailsWith<CancellationException> { asListenableFuture.get() } + // Future was not interrupted, but also wasn't blocking, so it will be successfully + // cancelled by its parent Coroutine. + assertTrue(future.isCancelled) + assertTrue(asListenableFuture.isCancelled) + assertTrue(deferred.isCancelled) + assertFailsWith<CancellationException> { future.get() } + finish(4) + } + + @Test fun testFutureCancellation() = runTest { val future = awaitFutureWithCancel(true) assertTrue(future.isCancelled) @@ -198,6 +314,21 @@ class ListenableFutureTest : TestBase() { } @Test + fun testAsListenableDeferredCancellationCauseAndMessagePropagate() = runTest { + val deferred = CompletableDeferred<Int>() + val inputCancellationException = CancellationException("Foobar") + inputCancellationException.initCause(OutOfMemoryError("Foobaz")) + deferred.cancel(inputCancellationException) + val asFuture = deferred.asListenableFuture() + + val outputCancellationException = + assertFailsWith<CancellationException> { asFuture.get() } + assertEquals(outputCancellationException.message, "Foobar") + assertTrue(outputCancellationException.cause is OutOfMemoryError) + assertEquals(outputCancellationException.cause?.message, "Foobaz") + } + + @Test fun testNoFutureCancellation() = runTest { val future = awaitFutureWithCancel(false) assertFalse(future.isCancelled) @@ -206,6 +337,59 @@ class ListenableFutureTest : TestBase() { } @Test + fun testCancelledDeferredAsListenableFutureAwaitThrowsCancellation() = runTest { + val future = Futures.immediateCancelledFuture<Int>() + val asDeferred = future.asDeferred() + val asDeferredAsFuture = asDeferred.asListenableFuture() + + assertTrue(asDeferredAsFuture.isCancelled) + assertFailsWith<CancellationException> { + val value: Int = asDeferredAsFuture.await() + } + } + + @Test + fun testCancelledDeferredAsListenableFutureAsDeferredPassesCancellationAlong() = runTest { + val deferred = CompletableDeferred<Int>() + deferred.completeExceptionally(CancellationException()) + val asFuture = deferred.asListenableFuture() + val asFutureAsDeferred = asFuture.asDeferred() + + assertTrue(asFutureAsDeferred.isCancelled) + assertTrue(asFutureAsDeferred.isCompleted) + // By documentation, join() shouldn't throw when asDeferred is already complete. + asFutureAsDeferred.join() + assertThat( + asFutureAsDeferred.getCompletionExceptionOrNull(), + IsInstanceOf(CancellationException::class.java)) + } + + @Test + fun testCancelledFutureAsDeferredAwaitThrowsCancellation() = runTest { + val future = Futures.immediateCancelledFuture<Int>() + val asDeferred = future.asDeferred() + + assertTrue(asDeferred.isCancelled) + assertFailsWith<CancellationException> { + val value: Int = asDeferred.await() + } + } + + @Test + fun testCancelledFutureAsDeferredJoinDoesNotThrow() = runTest { + val future = Futures.immediateCancelledFuture<Int>() + val asDeferred = future.asDeferred() + + assertTrue(asDeferred.isCancelled) + assertTrue(asDeferred.isCompleted) + // By documentation, join() shouldn't throw when asDeferred is already complete. + asDeferred.join() + assertThat( + asDeferred.getCompletionExceptionOrNull(), + IsInstanceOf(CancellationException::class.java)) + } + + @Test fun testCompletedFutureAsDeferred() = runTest { val future = SettableFuture.create<Int>() val task = async { @@ -241,6 +425,22 @@ class ListenableFutureTest : TestBase() { } @Test + fun testFutureCompletedWithNullAsDeferred() = runTest { + val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) + val future = executor.submit(Callable { null }) + val deferred = GlobalScope.async { + future.asDeferred().await() + } + + try { + deferred.await() + expectUnreached() + } catch (e: Throwable) { + assertTrue(e is KotlinNullPointerException) + } + } + + @Test fun testThrowingFutureAsDeferred() = runTest { val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) val future = executor.submit(Callable { throw TestException() }) |