aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTyson Henning <yorick@google.com>2019-07-16 13:19:17 -0700
committerRoman Elizarov <elizarov@gmail.com>2019-08-29 17:52:40 +0300
commit57cc36454da6851ecd2a00abe7cfd6ee6e06928f (patch)
tree97e8b2f59a2615a76889a5f6ac96f534e23d3c0c
parent95d88ab58b0008b7ed5b8c580902c4719da3e864 (diff)
downloadplatform_external_kotlinx.coroutines-57cc36454da6851ecd2a00abe7cfd6ee6e06928f.tar.gz
platform_external_kotlinx.coroutines-57cc36454da6851ecd2a00abe7cfd6ee6e06928f.tar.bz2
platform_external_kotlinx.coroutines-57cc36454da6851ecd2a00abe7cfd6ee6e06928f.zip
Repaired some of ListenableFuture.kt's cancellation corner cases.
This fixes: - Cancellation without an untrapped CancellationException propagating through a Callback; isCancelled() is the correct way to check for cancellation - Bidirectional propagation of cancellation through `asListenableFuture()` - The cause getting lost in the `asListenableFuture()` future when cancelling its `Deferred` parent with a cause This also: - Extensively documents the package and the contracts created by the promise-creating extension methods and `future()` - Uses `getUninterruptibly()` for speed - Uses `AbstractFuture` to make as certain as possible that `Future.cancel()` will return `true` at most once - Should clear up rare spooky race conditions around cancellation/interruption in hybrid Coroutines/Guava Futures codebases There are probably a few more interesting corner cases hiding in here, but this should be a good start improving the correctness of `.guava`'s adapters. This is a squash commit of kotlin/pr/1347, rebased on develop: - Incorporated first-round feedback. - Merged CancellationToCoroutine into ListenableFutureCoroutine to save an allocation. - Documented and tested for null completion of asDeferred()'s parent Future. - Renamed a cancellation test case for clarity of purpose. - Split asDeferred() documentation between KDoc/details - Implemented InternalFutures faster-fast path. Documented.
-rw-r--r--integration/kotlinx-coroutines-guava/src/ListenableFuture.kt486
-rw-r--r--integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt202
2 files changed, 596 insertions, 92 deletions
diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
index a924bb4a..de47c760 100644
--- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
+++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
@@ -5,31 +5,40 @@
package kotlinx.coroutines.guava
import com.google.common.util.concurrent.*
-import kotlinx.coroutines.*
+import com.google.common.util.concurrent.internal.InternalFutureFailureAccess
+import com.google.common.util.concurrent.internal.InternalFutures
import java.util.concurrent.*
-import java.util.concurrent.CancellationException
import kotlin.coroutines.*
+import kotlinx.coroutines.*
+import java.util.concurrent.CancellationException
/**
- * Starts new coroutine and returns its results an an implementation of [ListenableFuture].
- * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
+ * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
+ *
+ * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws
+ * [IllegalArgumentException], because Futures don't have a way to start lazily.
*
- * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
- * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
- * with corresponding [coroutineContext] element.
+ * The created coroutine is cancelled when the resulting future completes successfully, fails, or
+ * is cancelled.
*
- * By default, the coroutine is immediately scheduled for execution.
- * Other options can be specified via `start` parameter. See [CoroutineStart] for details.
- * A value of [CoroutineStart.LAZY] is not supported
- * (since `ListenableFuture` framework does not provide the corresponding capability) and
- * produces [IllegalArgumentException].
+ * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
+ * added/overlaid by passing [context].
*
- * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
+ * member, [Dispatchers.Default] is used.
*
- * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
+ * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
+ * a [Job] in [context].
+ *
+ * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
+ * facilities.
+ *
+ * Note that the error and cancellation semantics of [future] are _subtly different_ than
+ * [asListenableFuture]'s. See [ListenableFutureCoroutine] for details.
+ *
+ * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
- * @param block the coroutine code.
+ * @param block the code to execute.
*/
public fun <T> CoroutineScope.future(
context: CoroutineContext = EmptyCoroutineContext,
@@ -38,81 +47,98 @@ public fun <T> CoroutineScope.future(
): ListenableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = newCoroutineContext(context)
+ // TODO: It'd be nice not to leak this SettableFuture reference, which is easily blind-cast.
val future = SettableFuture.create<T>()
val coroutine = ListenableFutureCoroutine(newContext, future)
- Futures.addCallback(future, coroutine, MoreExecutors.directExecutor())
+ future.addListener(
+ coroutine,
+ MoreExecutors.directExecutor())
coroutine.start(start, coroutine, block)
return future
}
-private class ListenableFutureCoroutine<T>(
- context: CoroutineContext,
- private val future: SettableFuture<T>
-) : AbstractCoroutine<T>(context), FutureCallback<T> {
- /*
- * We register coroutine as callback to the future this coroutine completes.
- * But when future is cancelled externally, we'd like to cancel coroutine,
- * so we register on failure handler for this purpose
- */
- override fun onSuccess(result: T?) {
- // Do nothing
- }
-
- override fun onFailure(t: Throwable) {
- if (t is CancellationException) {
- cancel()
- }
- }
-
- override fun onCompleted(value: T) {
- future.set(value)
- }
-
- override fun onCancelled(cause: Throwable, handled: Boolean) {
- if (!future.setException(cause) && !handled) {
- // prevents loss of exception that was not handled by parent & could not be set to SettableFuture
- handleCoroutineException(context, cause)
- }
- }
-}
-
/**
- * Converts this deferred value to the instance of [ListenableFuture].
- * The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
+ * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
+ *
+ * Completion is non-atomic between the two promises.
+ *
+ * Cancellation is propagated bidirectionally.
+ *
+ * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
+ * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
+ * race with cancellation of the `Deferred`.
+ *
+ * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
+ * it will cancel the returned `Deferred`.
+ *
+ * When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the
+ * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
+ * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
+ * will complete with a different outcome than `this` `ListenableFuture`.
*/
-public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> = DeferredListenableFuture<T>(this)
-
-private class DeferredListenableFuture<T>(
- private val deferred: Deferred<T>
-) : AbstractFuture<T>() {
- init {
- deferred.invokeOnCompletion {
- try {
- set(deferred.getCompleted())
- } catch (t: Throwable) {
- setException(t)
+public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
+ /* This method creates very specific behaviour as it entangles the `Deferred` and
+ * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
+ * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
+ * behaviour is described here.
+ *
+ * When `this` `ListenableFuture` is successfully cancelled - meaning
+ * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
+ * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
+ * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
+ * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
+ * cancelled.
+ *
+ * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
+ * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
+ * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
+ * terminal state.
+ *
+ * The returned `Deferred` may receive and suppress the `true` return value from
+ * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
+ * This is unavoidable, so make sure no idempotent cancellation work is performed by a
+ * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
+ * cancellation was from the `Deferred` representation of the task.
+ *
+ * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
+ * semantics. See `Job` for a description of coroutine cancellation semantics.
+ */
+ // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
+ // Exception by using the same instance the Future created.
+ if (this is InternalFutureFailureAccess) {
+ val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
+ if (t != null) {
+ return CompletableDeferred<T>().also {
+ it.completeExceptionally(t)
}
}
}
- override fun interruptTask() { deferred.cancel() }
-}
-/**
- * Converts this listenable future to an instance of [Deferred].
- * It is cancelled when the resulting deferred is cancelled.
- */
-public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
- // Fast path if already completed
+ // Second, try the fast path for a completed Future. The Future is known to be done, so get()
+ // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
+ // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
+ // handle interruption.
if (isDone) {
return try {
- @Suppress("UNCHECKED_CAST")
- CompletableDeferred(get() as T)
- } catch (e: Throwable) {
- // unwrap original cause from ExecutionException
- val original = (e as? ExecutionException)?.cause ?: e
- CompletableDeferred<T>().also { it.completeExceptionally(original) }
+ val value = Uninterruptibles.getUninterruptibly(this)
+ if (value == null) {
+ CompletableDeferred<T>().also {
+ it.completeExceptionally(KotlinNullPointerException())
+ }
+ } else {
+ CompletableDeferred(value)
+ }
+ } catch (e: CancellationException) {
+ CompletableDeferred<T>().also { it.cancel(e) }
+ } catch (e: ExecutionException) {
+ // ExecutionException is the only kind of exception that can be thrown from a gotten
+ // Future. Anything else showing up here indicates a very fundamental bug in a
+ // Future implementation.
+ CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
}
}
+
+ // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
val deferred = CompletableDeferred<T>()
Futures.addCallback(this, object : FutureCallback<T> {
override fun onSuccess(result: T?) {
@@ -124,41 +150,319 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
}
}, MoreExecutors.directExecutor())
- deferred.invokeOnCompletion { cancel(false) }
+ // ... And cancel the Future when the deferred completes. Since the return type of this method
+ // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
+ // completion handler runs before the Future is completed, the Deferred must have been
+ // cancelled and should propagate its cancellation. If it runs after the Future is completed,
+ // this is a no-op.
+ deferred.invokeOnCompletion {
+ cancel(false)
+ }
return deferred
}
/**
- * Awaits for completion of the future without blocking a thread.
+ * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
+ *
+ * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
+ * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
+ * [Exception].
+ *
+ * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
+ * state - a serious fundamental bug.
+ */
+private fun ExecutionException.nonNullCause(): Throwable {
+ return this.cause!!
+}
+
+/**
+ * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
+ *
+ * Completion is non-atomic between the two promises.
+ *
+ * When either promise successfully completes, it will attempt to synchronously complete its
+ * counterpart with the same value. This will succeed barring a race with cancellation.
+ *
+ * When either promise completes with an Exception, it will attempt to synchronously complete its
+ * counterpart with the same Exception. This will succeed barring a race with cancellation.
+ *
+ * Cancellation is propagated bidirectionally.
+ *
+ * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
+ * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
+ * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
+ * non-cancelled terminal state.
+ *
+ * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
+ * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
+ * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
+ * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
+ * successfully cancelled, for their different meanings of "successfully cancelled".
+ *
+ * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
+ * semantics. See [Job] for a description of coroutine cancellation semantics. See
+ * [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
+ * corner cases of this method.
+ */
+public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
+ val outerFuture = OuterFuture<T>(this)
+ outerFuture.afterInit()
+ return outerFuture
+}
+
+/**
+ * Awaits completion of `this` [ListenableFuture] without blocking a thread.
+ *
+ * This suspend function is cancellable.
*
- * This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
*
- * This method is intended to be used with one-shot futures, so on coroutine cancellation future is cancelled as well.
- * If cancelling given future is undesired, `future.asDeferred().await()` should be used instead.
+ * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
+ * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
+ * [kotlinx.coroutines.NonCancellable].
+ *
*/
public suspend fun <T> ListenableFuture<T>.await(): T {
try {
- if (isDone) return get() as T
+ if (isDone) return Uninterruptibles.getUninterruptibly(this)
} catch (e: ExecutionException) {
- throw e.cause ?: e // unwrap original cause from ExecutionException
+ // ExecutionException is the only kind of exception that can be thrown from a gotten
+ // Future, other than CancellationException. Cancellation is propagated upward so that
+ // the coroutine running this suspend function may process it.
+ // Any other Exception showing up here indicates a very fundamental bug in a
+ // Future implementation.
+ throw e.nonNullCause()
}
return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
- val callback = ContinuationCallback(cont)
- Futures.addCallback(this, callback, MoreExecutors.directExecutor())
+ addListener(
+ ToContinuation(this, cont),
+ MoreExecutors.directExecutor())
cont.invokeOnCancellation {
cancel(false)
- callback.cont = null // clear the reference to continuation from the future's callback
}
}
}
-private class ContinuationCallback<T>(
- @Volatile @JvmField var cont: Continuation<T>?
-) : FutureCallback<T> {
- @Suppress("UNCHECKED_CAST")
- override fun onSuccess(result: T?) { cont?.resume(result as T) }
- override fun onFailure(t: Throwable) { cont?.resumeWithException(t) }
+/**
+ * Propagates the outcome of [futureToObserve] to [continuation] on completion.
+ *
+ * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
+ * and fails, the cause of the Future will be propagated without a wrapping
+ * [ExecutionException] when thrown.
+ */
+private class ToContinuation<T>(
+ val futureToObserve: ListenableFuture<T>,
+ val continuation: CancellableContinuation<T>
+): Runnable {
+ override fun run() {
+ if (futureToObserve.isCancelled) {
+ continuation.cancel()
+ } else {
+ try {
+ continuation.resumeWith(
+ Result.success(Uninterruptibles.getUninterruptibly(futureToObserve)))
+ } catch (e: ExecutionException) {
+ // ExecutionException is the only kind of exception that can be thrown from a gotten
+ // Future. Anything else showing up here indicates a very fundamental bug in a
+ // Future implementation.
+ continuation.resumeWithException(e.nonNullCause())
+ }
+ }
+ }
+}
+
+/**
+ * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
+ * completion.
+ *
+ * The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback.
+ * See [run] for details. Both types are implemented by this object to save an allocation.
+ */
+private class ListenableFutureCoroutine<T>(
+ context: CoroutineContext,
+ private val future: SettableFuture<T>
+) : AbstractCoroutine<T>(context), Runnable {
+
+ /**
+ * When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if
+ * [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if
+ * and only if its `isCancelled()` method returns true.
+ *
+ * Any error that occurs after successfully cancelling a [ListenableFuture]
+ * created by submitting the returned object as a [Runnable] to an `Executor` will be passed
+ * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit
+ * it to return an error after it is successfully cancelled.
+ *
+ * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully
+ * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to
+ * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the
+ * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that
+ * the [Deferred] pointing to the task will be used to observe any error outcome occurring after
+ * cancellation.
+ *
+ * This may be counterintuitive, but it maintains the error and cancellation contracts of both
+ * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
+ * to the same running task.
+ */
+ override fun run() {
+ if (future.isCancelled) {
+ cancel()
+ }
+ }
+
+ override fun onCompleted(value: T) {
+ future.set(value)
+ }
+
+ // TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation?
+ override fun onCancelled(cause: Throwable, handled: Boolean) {
+ if (!future.setException(cause) && !handled) {
+ // prevents loss of exception that was not handled by parent & could not be set to SettableFuture
+ handleCoroutineException(context, cause)
+ }
+ }
+}
+
+/**
+ * A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with
+ * it.
+ *
+ * This setup allows the returned [ListenableFuture] to maintain the following properties:
+ *
+ * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
+ * and [isCancelled] methods
+ * - Cancellation propagation both to and from [Deferred]
+ * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
+ * with different concrete implementations of [ListenableFuture]
+ * - Fully correct cancellation and listener happens-after obeying [Future] and
+ * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
+ * The best way to be correct, especially given the fun corner cases from
+ * [AsyncFuture.setAsync], is to just use an [AsyncFuture].
+ * - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture]
+ * around its input [deferred] as a state engine to establish happens-after-completion. This
+ * could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the
+ * cost of the implementation's readability.
+ */
+private class OuterFuture<T>(private val deferred: Deferred<T>): ListenableFuture<T> {
+ val innerFuture = DeferredListenableFuture(deferred)
+
+ // Adding the listener after initialization resolves partial construction hairpin problem.
+ //
+ // This invokeOnCompletion completes the innerFuture as `deferred` does. The innerFuture may
+ // have completed earlier if it got cancelled! See DeferredListenableFuture.
+ fun afterInit() {
+ deferred.invokeOnCompletion {
+ innerFuture.complete()
+ }
+ }
+
+ /**
+ * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
+ * [Job.isCancelled].
+ *
+ * When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate
+ * [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class.
+ *
+ * See [DeferredListenableFuture.cancel].
+ */
+ override fun isCancelled(): Boolean {
+ // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
+ // In the case that the deferred has completed with cancellation, completing `this`, its
+ // reaching the "cancelled" state with a cause of CancellationException is treated as the
+ // same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and
+ // this Future hasn't itself been successfully cancelled, the Future will return
+ // isCancelled() == false. This is the only discovered way to reconcile the two different
+ // cancellation contracts.
+ return isDone
+ && (innerFuture.isCancelled
+ || deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException)
+ }
+
+ /**
+ * Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that
+ * Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes
+ * happens-after ordering for completion of the [Deferred] input to [OuterFuture].
+ *
+ * `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after
+ * contract of [Future] to be correctly followed. If this method were to directly use
+ * _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this
+ * [ListenableFuture] is created from might be in an incomplete state when used by `get()`.
+ */
+ override fun get(): T {
+ return getInternal(innerFuture.get())
+ }
+
+ /** See [get()]. */
+ override fun get(timeout: Long, unit: TimeUnit): T {
+ return getInternal(innerFuture.get(timeout, unit))
+ }
+
+ /** See [get()]. */
+ private fun getInternal(deferred: Deferred<T>): T {
+ if (deferred.isCancelled) {
+ val exception = deferred.getCompletionExceptionOrNull()
+ if (exception is kotlinx.coroutines.CancellationException) {
+ throw exception
+ } else {
+ throw ExecutionException(exception)
+ }
+ } else {
+ return deferred.getCompleted()
+ }
+ }
+
+ override fun addListener(listener: Runnable, executor: Executor) {
+ innerFuture.addListener(listener, executor)
+ }
+
+ override fun isDone(): Boolean {
+ return innerFuture.isDone
+ }
+
+ override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
+ return innerFuture.cancel(mayInterruptIfRunning)
+ }
+}
+
+/**
+ * Holds a delegate deferred, and serves as a state machine for [Future] cancellation.
+ *
+ * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
+ * cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to
+ * _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when
+ * returned.
+ */
+private class DeferredListenableFuture<T>(
+ private val deferred: Deferred<T>
+) : AbstractFuture<Deferred<T>>() {
+
+ fun complete() {
+ set(deferred)
+ }
+
+ /**
+ * Tries to cancel the task. This is fundamentally racy.
+ *
+ * For any given call to `cancel()`, if [deferred] is already completed, the call will complete
+ * this Future with it, and fail to cancel. Otherwise, the
+ * call to `cancel()` will try to cancel this Future: if and only if cancellation of this
+ * succeeds, [deferred] will have its [Deferred.cancel] called.
+ *
+ * This arrangement means that [deferred] _might not successfully cancel_, if the race resolves
+ * in a particular way. [deferred] may also be in its "cancelling" state while this
+ * ListenableFuture is complete and cancelled.
+ *
+ * [OuterFuture] collaborates with this class to present a more cohesive picture and ensure
+ * that certain combinations of cancelled/cancelling states can't be observed.
+ */
+ override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
+ return if (super.cancel(mayInterruptIfRunning)) {
+ deferred.cancel()
+ true
+ } else {
+ false
+ }
+ }
}
diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
index cf82318a..1f96dfdb 100644
--- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
+++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
@@ -6,12 +6,12 @@ package kotlinx.coroutines.guava
import com.google.common.util.concurrent.*
import kotlinx.coroutines.*
-import kotlinx.coroutines.CancellationException
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
import org.junit.Test
import java.util.concurrent.*
+import java.util.concurrent.CancellationException
class ListenableFutureTest : TestBase() {
@Before
@@ -190,6 +190,122 @@ class ListenableFutureTest : TestBase() {
}
@Test
+ fun testFutureAwaitCancellationPropagatingToDeferred() = runTest {
+
+ val latch = CountDownLatch(1)
+ val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
+ val future = executor.submit(Callable { latch.await(); 42 })
+ val deferred = async {
+ expect(2)
+ future.await()
+ }
+ expect(1)
+ yield()
+ future.cancel(/*mayInterruptIfRunning=*/true)
+ expect(3)
+ latch.countDown()
+ deferred.join()
+ assertTrue(future.isCancelled)
+ assertTrue(deferred.isCancelled)
+ assertFailsWith<CancellationException> { future.get() }
+ finish(4)
+ }
+
+ @Test
+ fun testFutureAwaitCancellationPropagatingToDeferredNoInterruption() = runTest {
+
+ val latch = CountDownLatch(1)
+ val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
+ val future = executor.submit(Callable { latch.await(); 42 })
+ val deferred = async {
+ expect(2)
+ future.await()
+ }
+ expect(1)
+ yield()
+ future.cancel(/*mayInterruptIfRunning=*/false)
+ expect(3)
+ latch.countDown()
+ deferred.join()
+ assertTrue(future.isCancelled)
+ assertTrue(deferred.isCancelled)
+ assertFailsWith<CancellationException> { future.get() }
+ finish(4)
+ }
+
+ @Test
+ fun testAsListenableFutureCancellationPropagatingToDeferred() = runTest {
+ val latch = CountDownLatch(1)
+ val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
+ val future = executor.submit(Callable { latch.await(); 42 })
+ val deferred = async {
+ expect(2)
+ future.await()
+ }
+ val asListenableFuture = deferred.asListenableFuture()
+ expect(1)
+ yield()
+ asListenableFuture.cancel(/*mayInterruptIfRunning=*/true)
+ expect(3)
+ latch.countDown()
+ deferred.join()
+ assertTrue(future.isCancelled)
+ assertTrue(deferred.isCancelled)
+ assertTrue(asListenableFuture.isCancelled)
+ assertFailsWith<CancellationException> { future.get() }
+ finish(4)
+ }
+
+ @Test
+ fun testAsListenableFutureCancellationPropagatingToDeferredNoInterruption() = runTest {
+ val latch = CountDownLatch(1)
+ val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
+ val future = executor.submit(Callable { latch.await(); 42 })
+ val deferred = async {
+ expect(2)
+ future.await()
+ }
+ val asListenableFuture = deferred.asListenableFuture()
+ expect(1)
+ yield()
+ asListenableFuture.cancel(/*mayInterruptIfRunning=*/false)
+ expect(3)
+ latch.countDown()
+ deferred.join()
+ assertFailsWith<CancellationException> { asListenableFuture.get() }
+ assertTrue(future.isCancelled)
+ assertTrue(asListenableFuture.isCancelled)
+ assertTrue(deferred.isCancelled)
+ assertFailsWith<CancellationException> { future.get() }
+ finish(4)
+ }
+
+ @Test
+ fun testAsListenableFutureCancellationThroughSetFuture() = runTest {
+ val latch = CountDownLatch(1)
+ val future = SettableFuture.create<Void>()
+ val deferred = async {
+ expect(2)
+ future.await()
+ }
+ val asListenableFuture = deferred.asListenableFuture()
+ expect(1)
+ yield()
+ future.setFuture(Futures.immediateCancelledFuture())
+ expect(3)
+ latch.countDown()
+ deferred.join()
+ assertFailsWith<CancellationException> { asListenableFuture.get() }
+ // Future was not interrupted, but also wasn't blocking, so it will be successfully
+ // cancelled by its parent Coroutine.
+ assertTrue(future.isCancelled)
+ assertTrue(asListenableFuture.isCancelled)
+ assertTrue(deferred.isCancelled)
+ assertFailsWith<CancellationException> { future.get() }
+ finish(4)
+ }
+
+ @Test
fun testFutureCancellation() = runTest {
val future = awaitFutureWithCancel(true)
assertTrue(future.isCancelled)
@@ -198,6 +314,21 @@ class ListenableFutureTest : TestBase() {
}
@Test
+ fun testAsListenableDeferredCancellationCauseAndMessagePropagate() = runTest {
+ val deferred = CompletableDeferred<Int>()
+ val inputCancellationException = CancellationException("Foobar")
+ inputCancellationException.initCause(OutOfMemoryError("Foobaz"))
+ deferred.cancel(inputCancellationException)
+ val asFuture = deferred.asListenableFuture()
+
+ val outputCancellationException =
+ assertFailsWith<CancellationException> { asFuture.get() }
+ assertEquals(outputCancellationException.message, "Foobar")
+ assertTrue(outputCancellationException.cause is OutOfMemoryError)
+ assertEquals(outputCancellationException.cause?.message, "Foobaz")
+ }
+
+ @Test
fun testNoFutureCancellation() = runTest {
val future = awaitFutureWithCancel(false)
assertFalse(future.isCancelled)
@@ -206,6 +337,59 @@ class ListenableFutureTest : TestBase() {
}
@Test
+ fun testCancelledDeferredAsListenableFutureAwaitThrowsCancellation() = runTest {
+ val future = Futures.immediateCancelledFuture<Int>()
+ val asDeferred = future.asDeferred()
+ val asDeferredAsFuture = asDeferred.asListenableFuture()
+
+ assertTrue(asDeferredAsFuture.isCancelled)
+ assertFailsWith<CancellationException> {
+ val value: Int = asDeferredAsFuture.await()
+ }
+ }
+
+ @Test
+ fun testCancelledDeferredAsListenableFutureAsDeferredPassesCancellationAlong() = runTest {
+ val deferred = CompletableDeferred<Int>()
+ deferred.completeExceptionally(CancellationException())
+ val asFuture = deferred.asListenableFuture()
+ val asFutureAsDeferred = asFuture.asDeferred()
+
+ assertTrue(asFutureAsDeferred.isCancelled)
+ assertTrue(asFutureAsDeferred.isCompleted)
+ // By documentation, join() shouldn't throw when asDeferred is already complete.
+ asFutureAsDeferred.join()
+ assertThat(
+ asFutureAsDeferred.getCompletionExceptionOrNull(),
+ IsInstanceOf(CancellationException::class.java))
+ }
+
+ @Test
+ fun testCancelledFutureAsDeferredAwaitThrowsCancellation() = runTest {
+ val future = Futures.immediateCancelledFuture<Int>()
+ val asDeferred = future.asDeferred()
+
+ assertTrue(asDeferred.isCancelled)
+ assertFailsWith<CancellationException> {
+ val value: Int = asDeferred.await()
+ }
+ }
+
+ @Test
+ fun testCancelledFutureAsDeferredJoinDoesNotThrow() = runTest {
+ val future = Futures.immediateCancelledFuture<Int>()
+ val asDeferred = future.asDeferred()
+
+ assertTrue(asDeferred.isCancelled)
+ assertTrue(asDeferred.isCompleted)
+ // By documentation, join() shouldn't throw when asDeferred is already complete.
+ asDeferred.join()
+ assertThat(
+ asDeferred.getCompletionExceptionOrNull(),
+ IsInstanceOf(CancellationException::class.java))
+ }
+
+ @Test
fun testCompletedFutureAsDeferred() = runTest {
val future = SettableFuture.create<Int>()
val task = async {
@@ -241,6 +425,22 @@ class ListenableFutureTest : TestBase() {
}
@Test
+ fun testFutureCompletedWithNullAsDeferred() = runTest {
+ val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
+ val future = executor.submit(Callable { null })
+ val deferred = GlobalScope.async {
+ future.asDeferred().await()
+ }
+
+ try {
+ deferred.await()
+ expectUnreached()
+ } catch (e: Throwable) {
+ assertTrue(e is KotlinNullPointerException)
+ }
+ }
+
+ @Test
fun testThrowingFutureAsDeferred() = runTest {
val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
val future = executor.submit(Callable { throw TestException() })