diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt | 67 |
1 files changed, 42 insertions, 25 deletions
diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 2874e7d5..c689a381 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -46,40 +46,49 @@ internal class DispatchedContinuation<in T>( * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable], * [CancellableContinuationImpl.getResult] will check for cancellation later. * - * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel. - * AbstractChannel.receive method relies on the fact that the following pattern + * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation. + * In the `getResult`, we have the following code: * ``` - * suspendCancellableCoroutineReusable { cont -> - * val result = pollFastPath() - * if (result != null) cont.resume(result) + * if (trySuspend()) { + * // <- at this moment current continuation can be redispatched and claimed again. + * attachChildToParent() + * releaseClaimedContinuation() * } * ``` - * always succeeds. - * To make it always successful, we actually postpone "reusable" cancellation - * to this phase and set cancellation only at the moment of instantiation. */ private val _reusableCancellableContinuation = atomic<Any?>(null) - public val reusableCancellableContinuation: CancellableContinuationImpl<*>? + private val reusableCancellableContinuation: CancellableContinuationImpl<*>? get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*> - public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { + fun isReusable(): Boolean { /* + Invariant: caller.resumeMode.isReusableMode * Reusability control: - * `null` -> no reusability at all, false - * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true - * Else, if result is CCI === requester. - * Identity check my fail for the following pattern: - * ``` - * loop: - * suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle - * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise - * it will leak because it won't be freed by `releaseInterceptedContinuation` - * ``` + * `null` -> no reusability at all, `false` + * anything else -> reusable. */ - val value = _reusableCancellableContinuation.value ?: return false - if (value is CancellableContinuationImpl<*>) return value === requester - return true + return _reusableCancellableContinuation.value != null + } + + /** + * Awaits until previous call to `suspendCancellableCoroutineReusable` will + * stop mutating cached instance + */ + fun awaitReusability() { + _reusableCancellableContinuation.loop { + if (it !== REUSABLE_CLAIMED) return + } + } + + fun release() { + /* + * Called from `releaseInterceptedContinuation`, can be concurrent with + * the code in `getResult` right after `trySuspend` returned `true`, so we have + * to wait for a release here. + */ + awaitReusability() + reusableCancellableContinuation?.detachChild() } /** @@ -103,11 +112,20 @@ internal class DispatchedContinuation<in T>( _reusableCancellableContinuation.value = REUSABLE_CLAIMED return null } + // potentially competing with cancel state is CancellableContinuationImpl<*> -> { if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) { return state as CancellableContinuationImpl<T> } } + state === REUSABLE_CLAIMED -> { + // Do nothing, wait until reusable instance will be returned from + // getResult() of a previous `suspendCancellableCoroutineReusable` + } + state is Throwable -> { + // Also do nothing, Throwable can only indicate that the CC + // is in REUSABLE_CLAIMED state, but with postponed cancellation + } else -> error("Inconsistent state $state") } } @@ -127,14 +145,13 @@ internal class DispatchedContinuation<in T>( * * See [CancellableContinuationImpl.getResult]. */ - fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? { + fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? { _reusableCancellableContinuation.loop { state -> // not when(state) to avoid Intrinsics.equals call when { state === REUSABLE_CLAIMED -> { if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null } - state === null -> return null state is Throwable -> { require(_reusableCancellableContinuation.compareAndSet(state, null)) return state |