aboutsummaryrefslogtreecommitdiffstats
path: root/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
blob: 53019c4bbe7c7e1c0ff29298b38844c266ac98a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
/*
 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.guava

import com.google.common.util.concurrent.*
import com.google.common.util.concurrent.internal.*
import kotlinx.coroutines.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import kotlin.coroutines.*

/**
 * Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
 *
 * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws
 * [IllegalArgumentException], because Futures don't have a way to start lazily.
 *
 * When the created coroutine [isCompleted][Job.isCompleted], it will try to
 * *synchronously* complete the returned Future with the same outcome. This will
 * succeed, barring a race with external cancellation of returned [ListenableFuture].
 *
 * Cancellation is propagated bidirectionally.
 *
 * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be
 * added/overlaid by passing [context].
 *
 * If the context does not have a [CoroutineDispatcher], nor any other [ContinuationInterceptor]
 * member, [Dispatchers.Default] is used.
 *
 * The parent job is inherited from this [CoroutineScope], and can be overridden by passing
 * a [Job] in [context].
 *
 * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
 * facilities.
 *
 * Note that the error and cancellation semantics of [future] are _subtly different_ than [asListenableFuture]'s.
 * In particular, any exception that happens in the coroutine after returned future is
 * successfully cancelled will be passed to the [CoroutineExceptionHandler] from the [context].
 * See [ListenableFutureCoroutine] for details.
 *
 * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
 * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
 * @param block the code to execute.
 */
public fun <T> CoroutineScope.future(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): ListenableFuture<T> {
    require(!start.isLazy) { "$start start is not supported" }
    val newContext = newCoroutineContext(context)
    val coroutine = ListenableFutureCoroutine<T>(newContext)
    coroutine.start(start, coroutine, block)
    return coroutine.future
}

/**
 * Returns a [Deferred] that is completed or failed by `this` [ListenableFuture].
 *
 * Completion is non-atomic between the two promises.
 *
 * Cancellation is propagated bidirectionally.
 *
 * When `this` `ListenableFuture` completes (either successfully or exceptionally) it will try to
 * complete the returned `Deferred` with the same value or exception. This will succeed, barring a
 * race with cancellation of the `Deferred`.
 *
 * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel],
 * it will cancel the returned `Deferred`.
 *
 * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the
 * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the
 * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred`
 * will complete with a different outcome than `this` `ListenableFuture`.
 */
public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
    /* This method creates very specific behaviour as it entangles the `Deferred` and
     * `ListenableFuture`. This behaviour is the best discovered compromise between the possible
     * states and interface contracts of a `Future` and the states of a `Deferred`. The specific
     * behaviour is described here.
     *
     * When `this` `ListenableFuture` is successfully cancelled - meaning
     * `ListenableFuture.cancel()` returned `true` - it will synchronously cancel the returned
     * `Deferred`. This can only race with cancellation of the returned `Deferred`, so the
     * `Deferred` will always be put into its "cancelling" state and (barring uncooperative
     * cancellation) _eventually_ reach its "cancelled" state when either promise is successfully
     * cancelled.
     *
     * When the returned `Deferred` is cancelled, `ListenableFuture.cancel()` will be synchronously
     * called on `this` `ListenableFuture`. This will attempt to cancel the `Future`, though
     * cancellation may not succeed and the `ListenableFuture` may complete in a non-cancelled
     * terminal state.
     *
     * The returned `Deferred` may receive and suppress the `true` return value from
     * `ListenableFuture.cancel()` when the task is cancelled via the `Deferred` reference to it.
     * This is unavoidable, so make sure no idempotent cancellation work is performed by a
     * reference-holder of the `ListenableFuture` task. The idempotent work won't get done if
     * cancellation was from the `Deferred` representation of the task.
     *
     * This is inherently a race. See `Future.cancel()` for a description of `Future` cancellation
     * semantics. See `Job` for a description of coroutine cancellation semantics.
     */
    // First, try the fast-fast error path for Guava ListenableFutures. This will save allocating an
    // Exception by using the same instance the Future created.
    if (this is InternalFutureFailureAccess) {
        val t: Throwable? = InternalFutures.tryInternalFastPathGetFailure(this)
        if (t != null) {
            return CompletableDeferred<T>().also {
                it.completeExceptionally(t)
            }
        }
    }

    // Second, try the fast path for a completed Future. The Future is known to be done, so get()
    // will not block, and thus it won't be interrupted. Calling getUninterruptibly() instead of
    // getDone() in this known-non-interruptible case saves the volatile read that getDone() uses to
    // handle interruption.
    if (isDone) {
        return try {
            CompletableDeferred(Uninterruptibles.getUninterruptibly(this))
        } catch (e: CancellationException) {
            CompletableDeferred<T>().also { it.cancel(e) }
        } catch (e: ExecutionException) {
            // ExecutionException is the only kind of exception that can be thrown from a gotten
            // Future. Anything else showing up here indicates a very fundamental bug in a
            // Future implementation.
            CompletableDeferred<T>().also { it.completeExceptionally(e.nonNullCause()) }
        }
    }

    // Finally, if this isn't done yet, attach a Listener that will complete the Deferred.
    val deferred = CompletableDeferred<T>()
    Futures.addCallback(this, object : FutureCallback<T> {
        override fun onSuccess(result: T?) {
            // Here we work with flexible types, so we unchecked cast to trick the type system
            @Suppress("UNCHECKED_CAST")
            deferred.complete(result as T)
        }

        override fun onFailure(t: Throwable) {
            deferred.completeExceptionally(t)
        }
    }, MoreExecutors.directExecutor())

    // ... And cancel the Future when the deferred completes. Since the return type of this method
    // is Deferred, the only interaction point from the caller is to cancel the Deferred. If this
    // completion handler runs before the Future is completed, the Deferred must have been
    // cancelled and should propagate its cancellation. If it runs after the Future is completed,
    // this is a no-op.
    deferred.invokeOnCompletion {
        cancel(false)
    }
    // Return hides the CompletableDeferred. This should prevent casting.
    return object : Deferred<T> by deferred {}
}

/**
 * Returns the cause from an [ExecutionException] thrown by a [Future.get] or similar.
 *
 * [ExecutionException] _always_ wraps a non-null cause when Future.get() throws. A Future cannot
 * fail without a non-null `cause`, because the only way a Future _can_ fail is an uncaught
 * [Exception].
 *
 * If this !! throws [NullPointerException], a Future is breaking its interface contract and losing
 * state - a serious fundamental bug.
 */
private fun ExecutionException.nonNullCause(): Throwable {
    return this.cause!!
}

/**
 * Returns a [ListenableFuture] that is completed or failed by `this` [Deferred].
 *
 * Completion is non-atomic between the two promises.
 *
 * When either promise successfully completes, it will attempt to synchronously complete its
 * counterpart with the same value. This will succeed barring a race with cancellation.
 *
 * When either promise completes with an Exception, it will attempt to synchronously complete its
 * counterpart with the same Exception. This will succeed barring a race with cancellation.
 *
 * Cancellation is propagated bidirectionally.
 *
 * When the returned [Future] is successfully cancelled - meaning [Future.cancel] returned true -
 * [Deferred.cancel] will be synchronously called on `this` [Deferred]. This will attempt to cancel
 * the `Deferred`, though cancellation may not succeed and the `Deferred` may complete in a
 * non-cancelled terminal state.
 *
 * When `this` `Deferred` reaches its "cancelled" state with a successful cancellation - meaning it
 * completes with [kotlinx.coroutines.CancellationException] - `this` `Deferred` will synchronously
 * cancel the returned `Future`. This can only race with cancellation of the returned `Future`, so
 * the returned `Future` will always _eventually_ reach its cancelled state when either promise is
 * successfully cancelled, for their different meanings of "successfully cancelled".
 *
 * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation
 * semantics. See [Job] for a description of coroutine cancellation semantics. See
 * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and
 * corner cases of this method.
 */
public fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> {
    val listenableFuture = JobListenableFuture<T>(this)
    // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred.
    // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel().
    invokeOnCompletion { throwable ->
        if (throwable == null) {
            listenableFuture.complete(getCompleted())
        } else {
            listenableFuture.completeExceptionallyOrCancel(throwable)
        }
    }
    return listenableFuture
}

/**
 * Awaits completion of `this` [ListenableFuture] without blocking a thread.
 *
 * This suspend function is cancellable.
 *
 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
 * stops waiting for the future and immediately resumes with [CancellationException][kotlinx.coroutines.CancellationException].
 *
 * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well.
 * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or
 * [kotlinx.coroutines.NonCancellable].
 */
public suspend fun <T> ListenableFuture<T>.await(): T {
    try {
        if (isDone) return Uninterruptibles.getUninterruptibly(this)
    } catch (e: ExecutionException) {
        // ExecutionException is the only kind of exception that can be thrown from a gotten
        // Future, other than CancellationException. Cancellation is propagated upward so that
        // the coroutine running this suspend function may process it.
        // Any other Exception showing up here indicates a very fundamental bug in a
        // Future implementation.
        throw e.nonNullCause()
    }

    return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
        addListener(
          ToContinuation(this, cont),
          MoreExecutors.directExecutor())
        cont.invokeOnCancellation {
            cancel(false)
        }
    }
}

/**
 * Propagates the outcome of [futureToObserve] to [continuation] on completion.
 *
 * Cancellation is propagated as cancelling the continuation. If [futureToObserve] completes
 * and fails, the cause of the Future will be propagated without a wrapping
 * [ExecutionException] when thrown.
 */
private class ToContinuation<T>(
    val futureToObserve: ListenableFuture<T>,
    val continuation: CancellableContinuation<T>
): Runnable {
    override fun run() {
        if (futureToObserve.isCancelled) {
            continuation.cancel()
        } else {
            try {
                continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve))
            } catch (e: ExecutionException) {
                // ExecutionException is the only kind of exception that can be thrown from a gotten
                // Future. Anything else showing up here indicates a very fundamental bug in a
                // Future implementation.
                continuation.resumeWithException(e.nonNullCause())
            }
        }
    }
}

/**
 * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to
 * completion.
 *
 * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`.
 * By documented contract, a [Future] has been cancelled if
 * and only if its `isCancelled()` method returns true.
 *
 * Any error that occurs after successfully cancelling a [ListenableFuture] will be passed
 * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit
 * it to return an error after it is successfully cancelled.
 *
 * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully
 * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to
 * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the
 * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that
 * the [Deferred] pointing to the task will be used to observe any error outcome occurring after
 * cancellation.
 *
 * This may be counterintuitive, but it maintains the error and cancellation contracts of both
 * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
 * to the same running task.
 */
private class ListenableFutureCoroutine<T>(
    context: CoroutineContext
) : AbstractCoroutine<T>(context) {

    // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
    @JvmField
    val future = JobListenableFuture<T>(this)

    override fun onCompleted(value: T) {
        future.complete(value)
    }

    override fun onCancelled(cause: Throwable, handled: Boolean) {
        if (!future.completeExceptionallyOrCancel(cause) && !handled) {
            // prevents loss of exception that was not handled by parent & could not be set to JobListenableFuture
            handleCoroutineException(context, cause)
        }
    }
}

/**
 * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it.
 *
 * This setup allows the returned [ListenableFuture] to maintain the following properties:
 *
 * - Correct implementation of [Future]'s happens-after semantics documented for [get], [isDone]
 *   and [isCancelled] methods
 * - Cancellation propagation both to and from [Deferred]
 * - Correct cancellation and completion semantics even when this [ListenableFuture] is combined
 *   with different concrete implementations of [ListenableFuture]
 *   - Fully correct cancellation and listener happens-after obeying [Future] and
 *     [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve.
 *     The best way to be correct, especially given the fun corner cases from
 *     [AbstractFuture.setFuture], is to just use an [AbstractFuture].
 *   - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture]
 *     around coroutine's result as a state engine to establish happens-after-completion. This
 *     could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
 *     cost of the implementation's readability.
 */
private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
    /**
     * Serves as a state machine for [Future] cancellation.
     *
     * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
     * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
     * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
     *
     * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
     */
    private val auxFuture = SettableFuture.create<Any>()

    /**
     * `true` if [auxFuture.get][ListenableFuture.get] throws [ExecutionException].
     *
     * Note: this is eventually consistent with the state of [auxFuture].
     *
     * Unfortunately, there's no API to figure out if [ListenableFuture] throws [ExecutionException]
     * apart from calling [ListenableFuture.get] on it. To avoid unnecessary [ExecutionException] allocation
     * we use this field as an optimization.
     */
    private var auxFutureIsFailed: Boolean = false

    /**
     * When the attached coroutine [isCompleted][Job.isCompleted] successfully
     * its outcome should be passed to this method.
     *
     * This should succeed barring a race with external cancellation.
     */
    fun complete(result: T): Boolean = auxFuture.set(result)

    /**
     * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled]
     * its outcome should be passed to this method.
     *
     * This method will map coroutine's exception into corresponding Future's exception.
     *
     * This should succeed barring a race with external cancellation.
     */
    // CancellationException is wrapped into `Cancelled` to preserve original cause and message.
    // All the other exceptions are delegated to SettableFuture.setException.
    fun completeExceptionallyOrCancel(t: Throwable): Boolean =
        if (t is CancellationException) auxFuture.set(Cancelled(t))
        else auxFuture.setException(t).also { if (it) auxFutureIsFailed = true }

    /**
     * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
     * [Job.isCancelled].
     *
     * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture]
     * contains [CancellationException].
     *
     * See [cancel].
     */
    override fun isCancelled(): Boolean {
        // This expression ensures that isCancelled() will *never* return true when isDone() returns false.
        // In the case that the deferred has completed with cancellation, completing `this`, its
        // reaching the "cancelled" state with a cause of CancellationException is treated as the
        // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and
        // this Future hasn't itself been successfully cancelled, the Future will return
        // isCancelled() == false. This is the only discovered way to reconcile the two different
        // cancellation contracts.
        return auxFuture.isCancelled || isDone && !auxFutureIsFailed && try {
            Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled
        } catch (e: CancellationException) {
            // `auxFuture` got cancelled right after `auxFuture.isCancelled` returned false.
            true
        } catch (e: ExecutionException) {
            // `auxFutureIsFailed` hasn't been updated yet.
            auxFutureIsFailed = true
            false
        }
    }

    /**
     * Waits for [auxFuture] to complete by blocking, then uses its `result`
     * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
     * This establishes happens-after ordering for completion of the entangled coroutine.
     *
     * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
     * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
     *
     * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
     * contract of [Future] to be correctly followed.
     */
    override fun get(): T {
        return getInternal(auxFuture.get())
    }

    /** See [get()]. */
    override fun get(timeout: Long, unit: TimeUnit): T {
        return getInternal(auxFuture.get(timeout, unit))
    }

    /** See [get()]. */
    private fun getInternal(result: Any): T = if (result is Cancelled) {
        throw CancellationException().initCause(result.exception)
    } else {
        // We know that `auxFuture` can contain either `T` or `Cancelled`.
        @Suppress("UNCHECKED_CAST")
        result as T
    }

    override fun addListener(listener: Runnable, executor: Executor) {
        auxFuture.addListener(listener, executor)
    }

    override fun isDone(): Boolean {
        return auxFuture.isDone
    }

    /**
     * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
     *
     * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
     * succeeds, [jobToCancel] will have its [Job.cancel] called.
     *
     * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
     * in a particular way. [jobToCancel] may also be in its "cancelling" state while this
     * ListenableFuture is complete and cancelled.
     */
    override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
        // TODO: call jobToCancel.cancel() _before_ running the listeners.
        //  `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of
        //  `jobToCancel` until after auxFuture's listeners have already run.
        //  Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
        return if (auxFuture.cancel(mayInterruptIfRunning)) {
            jobToCancel.cancel()
            true
        } else {
            false
        }
    }

    override fun toString(): String = buildString {
        append(super.toString())
        append("[status=")
        if (isDone) {
            try {
                when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
                    is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
                    else -> append("SUCCESS, result=[$result]")
                }
            } catch (e: CancellationException) {
                // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
                append("CANCELLED")
            } catch (e: ExecutionException) {
                append("FAILURE, cause=[${e.cause}]")
            } catch (t: Throwable) {
                // Violation of Future's contract, should never happen.
                append("UNKNOWN, cause=[${t.javaClass} thrown from get()]")
            }
        } else {
            append("PENDING, delegate=[$auxFuture]")
        }
        append(']')
    }
}

/**
 * A wrapper for `Coroutine`'s [CancellationException].
 *
 * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately,
 * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this
 * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture].
 */
private class Cancelled(@JvmField val exception: CancellationException)