aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt')
-rw-r--r--kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt67
1 files changed, 42 insertions, 25 deletions
diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
index 2874e7d5..c689a381 100644
--- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
+++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
@@ -46,40 +46,49 @@ internal class DispatchedContinuation<in T>(
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
* [CancellableContinuationImpl.getResult] will check for cancellation later.
*
- * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
- * AbstractChannel.receive method relies on the fact that the following pattern
+ * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
+ * In the `getResult`, we have the following code:
* ```
- * suspendCancellableCoroutineReusable { cont ->
- * val result = pollFastPath()
- * if (result != null) cont.resume(result)
+ * if (trySuspend()) {
+ * // <- at this moment current continuation can be redispatched and claimed again.
+ * attachChildToParent()
+ * releaseClaimedContinuation()
* }
* ```
- * always succeeds.
- * To make it always successful, we actually postpone "reusable" cancellation
- * to this phase and set cancellation only at the moment of instantiation.
*/
private val _reusableCancellableContinuation = atomic<Any?>(null)
- public val reusableCancellableContinuation: CancellableContinuationImpl<*>?
+ private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
- public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
+ fun isReusable(): Boolean {
/*
+ Invariant: caller.resumeMode.isReusableMode
* Reusability control:
- * `null` -> no reusability at all, false
- * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
- * Else, if result is CCI === requester.
- * Identity check my fail for the following pattern:
- * ```
- * loop:
- * suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle
- * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise
- * it will leak because it won't be freed by `releaseInterceptedContinuation`
- * ```
+ * `null` -> no reusability at all, `false`
+ * anything else -> reusable.
*/
- val value = _reusableCancellableContinuation.value ?: return false
- if (value is CancellableContinuationImpl<*>) return value === requester
- return true
+ return _reusableCancellableContinuation.value != null
+ }
+
+ /**
+ * Awaits until previous call to `suspendCancellableCoroutineReusable` will
+ * stop mutating cached instance
+ */
+ fun awaitReusability() {
+ _reusableCancellableContinuation.loop {
+ if (it !== REUSABLE_CLAIMED) return
+ }
+ }
+
+ fun release() {
+ /*
+ * Called from `releaseInterceptedContinuation`, can be concurrent with
+ * the code in `getResult` right after `trySuspend` returned `true`, so we have
+ * to wait for a release here.
+ */
+ awaitReusability()
+ reusableCancellableContinuation?.detachChild()
}
/**
@@ -103,11 +112,20 @@ internal class DispatchedContinuation<in T>(
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
return null
}
+ // potentially competing with cancel
state is CancellableContinuationImpl<*> -> {
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
return state as CancellableContinuationImpl<T>
}
}
+ state === REUSABLE_CLAIMED -> {
+ // Do nothing, wait until reusable instance will be returned from
+ // getResult() of a previous `suspendCancellableCoroutineReusable`
+ }
+ state is Throwable -> {
+ // Also do nothing, Throwable can only indicate that the CC
+ // is in REUSABLE_CLAIMED state, but with postponed cancellation
+ }
else -> error("Inconsistent state $state")
}
}
@@ -127,14 +145,13 @@ internal class DispatchedContinuation<in T>(
*
* See [CancellableContinuationImpl.getResult].
*/
- fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
+ fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
_reusableCancellableContinuation.loop { state ->
// not when(state) to avoid Intrinsics.equals call
when {
state === REUSABLE_CLAIMED -> {
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
}
- state === null -> return null
state is Throwable -> {
require(_reusableCancellableContinuation.compareAndSet(state, null))
return state