diff options
author | Vsevolod Tolstopyatov <qwwdfsad@gmail.com> | 2021-03-24 11:51:12 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-24 11:51:12 +0300 |
commit | e608dfbb950831a865da7580b0f9b2b0dcfe7667 (patch) | |
tree | 2159758ead917e9bcf852655d77ece23e7c0365a | |
parent | 5f9e52c5644976dfb9e2e31bde573682c06e6ff9 (diff) | |
download | platform_external_kotlinx.coroutines-e608dfbb950831a865da7580b0f9b2b0dcfe7667.tar.gz platform_external_kotlinx.coroutines-e608dfbb950831a865da7580b0f9b2b0dcfe7667.tar.bz2 platform_external_kotlinx.coroutines-e608dfbb950831a865da7580b0f9b2b0dcfe7667.zip |
Rework reusability control in cancellable continuation (#2581)
* Rework reusability control in cancellable continuation
* Update initCancellability documentation and implementation to be aligned with current invariants
* Make parentHandle non-volatile and ensure there are no races around it
* Establish new reusability invariants
- Reusable continuation can be used _only_ if it states is not REUSABLE_CLAIMED
- If it is, spin-loop and wait for release
- Now the parent is attached to reusable continuation only if it was suspended at least once. Otherwise, the state machine can return via fast-path and no one will be able to release intercepted continuation (-> detach from parent)
- It implies that the parent is attached after trySuspend call and can be concurrently reused, this is where new invariant comes into play
* Leverage the fact that it's non-atomic and do not check it for cancellation prematurely. It increases the performance of fast-path, but potentially affects rare cancellation cases
Fixes #2564
8 files changed, 167 insertions, 66 deletions
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 8f589912..b133b793 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> { public fun completeResume(token: Any) /** - * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0. - * This function does nothing and is left only for binary compatibility with old compiled code. + * Internal function that setups cancellation behavior in [suspendCancellableCoroutine]. + * It's illegal to call this function in any non-`kotlinx.coroutines` code and + * such calls lead to undefined behaviour. + * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body. * * @suppress **This is unstable API and it is subject to change.** */ @@ -332,7 +334,7 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable( internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> { // If used outside of our dispatcher if (delegate !is DispatchedContinuation<T>) { - return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE) + return CancellableContinuationImpl(delegate, MODE_CANCELLABLE) } /* * Attempt to claim reusable instance. diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 1a8f3566..c310623c 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>( */ private val _state = atomic<Any?>(Active) - private val _parentHandle = atomic<DisposableHandle?>(null) - private var parentHandle: DisposableHandle? - get() = _parentHandle.value - set(value) { _parentHandle.value = value } + private var parentHandle: DisposableHandle? = null internal val state: Any? get() = _state.value @@ -93,7 +90,21 @@ internal open class CancellableContinuationImpl<in T>( } public override fun initCancellability() { - setupCancellation() + /* + * Invariant: at the moment of invocation, `this` has not yet + * leaked to user code and no one is able to invoke `resume` or `cancel` + * on it yet. Also, this function is not invoked for reusable continuations. + */ + val handle = installParentHandle() + ?: return // fast path -- don't do anything without parent + // now check our state _after_ registering, could have completed while we were registering, + // but only if parent was cancelled. Parent could be in a "cancelling" state for a while, + // so we are helping it and cleaning the node ourselves + if (isCompleted) { + // Can be invoked concurrently in 'parentCancelled', no problems here + handle.dispose() + parentHandle = NonDisposableHandle + } } private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) @@ -118,40 +129,6 @@ internal open class CancellableContinuationImpl<in T>( return true } - /** - * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. - * It is only invoked from an internal [getResult] function for reusable continuations - * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere. - */ - private fun setupCancellation() { - if (checkCompleted()) return - if (parentHandle !== null) return // fast path 2 -- was already initialized - val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent - val handle = parent.invokeOnCompletion( - onCancelling = true, - handler = ChildContinuation(this).asHandler - ) - parentHandle = handle - // now check our state _after_ registering (could have completed while we were registering) - // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us - if (isCompleted && !isReusable()) { - handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle - parentHandle = NonDisposableHandle // release it just in case, to aid GC - } - } - - private fun checkCompleted(): Boolean { - val completed = isCompleted - if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations - val dispatched = delegate as? DispatchedContinuation<*> ?: return completed - val cause = dispatched.checkPostponedCancellation(this) ?: return completed - if (!completed) { - // Note: this cancel may fail if one more concurrent cancel is currently being invoked - cancel(cause) - } - return true - } - public override val callerFrame: CoroutineStackFrame? get() = delegate as? CoroutineStackFrame @@ -188,7 +165,9 @@ internal open class CancellableContinuationImpl<in T>( */ private fun cancelLater(cause: Throwable): Boolean { if (!resumeMode.isReusableMode) return false - val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false + // Ensure that we are postponing cancellation to the right instance + if (!isReusable()) return false + val dispatched = delegate as DispatchedContinuation<*> return dispatched.postponeCancellation(cause) } @@ -216,7 +195,7 @@ internal open class CancellableContinuationImpl<in T>( private inline fun callCancelHandlerSafely(block: () -> Unit) { try { - block() + block() } catch (ex: Throwable) { // Handler should never fail, if it does -- it is an unhandled exception handleCoroutineException( @@ -276,9 +255,33 @@ internal open class CancellableContinuationImpl<in T>( @PublishedApi internal fun getResult(): Any? { - setupCancellation() - if (trySuspend()) return COROUTINE_SUSPENDED + val isReusable = isReusable() + // trySuspend may fail either if 'block' has resumed/cancelled a continuation + // or we got async cancellation from parent. + if (trySuspend()) { + /* + * We were neither resumed nor cancelled, time to suspend. + * But first we have to install parent cancellation handle (if we didn't yet), + * so CC could be properly resumed on parent cancellation. + */ + if (parentHandle == null) { + installParentHandle() + } + /* + * Release the continuation after installing the handle (if needed). + * If we were successful, then do nothing, it's ok to reuse the instance now. + * Otherwise, dispose the handle by ourselves. + */ + if (isReusable) { + releaseClaimedReusableContinuation() + } + return COROUTINE_SUSPENDED + } // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state + if (isReusable) { + // release claimed reusable continuation for the future reuse + releaseClaimedReusableContinuation() + } val state = this.state if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this) // if the parent job was already cancelled, then throw the corresponding cancellation exception @@ -296,6 +299,28 @@ internal open class CancellableContinuationImpl<in T>( return getSuccessfulResult(state) } + private fun installParentHandle(): DisposableHandle? { + val parent = context[Job] ?: return null // don't do anything without a parent + // Install the handle + val handle = parent.invokeOnCompletion( + onCancelling = true, + handler = ChildContinuation(this).asHandler + ) + parentHandle = handle + return handle + } + + /** + * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation, + * in which case it detaches from the parent and cancels this continuation. + */ + private fun releaseClaimedReusableContinuation() { + // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it + val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return + detachChild() + cancel(cancellationCause) + } + override fun resumeWith(result: Result<T>) = resumeImpl(result.toState(this), resumeMode) @@ -462,11 +487,10 @@ internal open class CancellableContinuationImpl<in T>( /** * Detaches from the parent. - * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` */ internal fun detachChild() { - val handle = parentHandle - handle?.dispose() + val handle = parentHandle ?: return + handle.dispose() parentHandle = NonDisposableHandle } diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index b2b88798..10be4a3d 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher : @InternalCoroutinesApi public override fun releaseInterceptedContinuation(continuation: Continuation<*>) { - (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild() + /* + * Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`, + * any ClassCastException can only indicate compiler bug + */ + val dispatched = continuation as DispatchedContinuation<*> + dispatched.release() } /** diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index a7dbbf8b..438e7f57 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -1228,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * thrown and not a JobCancellationException. */ val cont = AwaitContinuation(uCont.intercepted(), this) + // we are mimicking suspendCancellableCoroutine here and call initCancellability, too. + cont.initCancellability() cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler)) cont.getResult() } diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 2874e7d5..45b9699c 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -46,17 +46,15 @@ internal class DispatchedContinuation<in T>( * 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable], * [CancellableContinuationImpl.getResult] will check for cancellation later. * - * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel. - * AbstractChannel.receive method relies on the fact that the following pattern + * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation. + * In the `getResult`, we have the following code: * ``` - * suspendCancellableCoroutineReusable { cont -> - * val result = pollFastPath() - * if (result != null) cont.resume(result) + * if (trySuspend()) { + * // <- at this moment current continuation can be redispatched and claimed again. + * attachChildToParent() + * releaseClaimedContinuation() * } * ``` - * always succeeds. - * To make it always successful, we actually postpone "reusable" cancellation - * to this phase and set cancellation only at the moment of instantiation. */ private val _reusableCancellableContinuation = atomic<Any?>(null) @@ -66,9 +64,9 @@ internal class DispatchedContinuation<in T>( public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { /* * Reusability control: - * `null` -> no reusability at all, false + * `null` -> no reusability at all, `false` * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true - * Else, if result is CCI === requester. + * Else, if result is CCI === requester, then it's our reusable continuation * Identity check my fail for the following pattern: * ``` * loop: @@ -82,6 +80,27 @@ internal class DispatchedContinuation<in T>( return true } + + /** + * Awaits until previous call to `suspendCancellableCoroutineReusable` will + * stop mutating cached instance + */ + public fun awaitReusability() { + _reusableCancellableContinuation.loop { it -> + if (it !== REUSABLE_CLAIMED) return + } + } + + public fun release() { + /* + * Called from `releaseInterceptedContinuation`, can be concurrent with + * the code in `getResult` right after `trySuspend` returned `true`, so we have + * to wait for a release here. + */ + awaitReusability() + reusableCancellableContinuation?.detachChild() + } + /** * Claims the continuation for [suspendCancellableCoroutineReusable] block, * so all cancellations will be postponed. @@ -103,11 +122,20 @@ internal class DispatchedContinuation<in T>( _reusableCancellableContinuation.value = REUSABLE_CLAIMED return null } + // potentially competing with cancel state is CancellableContinuationImpl<*> -> { if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) { return state as CancellableContinuationImpl<T> } } + state === REUSABLE_CLAIMED -> { + // Do nothing, wait until reusable instance will be returned from + // getResult() of a previous `suspendCancellableCoroutineReusable` + } + state is Throwable -> { + // Also do nothing, Throwable can only indicate that the CC + // is in REUSABLE_CLAIMED state, but with postponed cancellation + } else -> error("Inconsistent state $state") } } @@ -127,14 +155,13 @@ internal class DispatchedContinuation<in T>( * * See [CancellableContinuationImpl.getResult]. */ - fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? { + fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? { _reusableCancellableContinuation.loop { state -> // not when(state) to avoid Intrinsics.equals call when { state === REUSABLE_CLAIMED -> { if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null } - state === null -> return null state is Throwable -> { require(_reusableCancellableContinuation.compareAndSet(state, null)) return state diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt index 55c05c55..c7c2c04e 100644 --- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() { private fun keepMe(a: ByteArray) { // does nothing, makes sure the variable is kept in state-machine } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt index e8079ebd..c4232d6e 100644 --- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt +++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -56,7 +56,7 @@ object FieldWalker { * Reflectively starts to walk through object graph and map to all the reached object to their path * in from root. Use [showPath] do display a path if needed. */ - private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> { + private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> { val visited = IdentityHashMap<Any, Ref>() if (root == null) return visited visited[root] = Ref.RootRef diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt new file mode 100644 index 00000000..8a20e084 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import org.junit.Test +import kotlin.test.* + +class ReusableCancellableContinuationLeakStressTest : TestBase() { + + @Suppress("UnnecessaryVariable") + private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T { + val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in + return r + } + + private val iterations = 100_000 * stressTestMultiplier + + class Leak(val i: Int) + + @Test // Simplified version of #2564 + fun testReusableContinuationLeak() = runTest { + val channel = produce(capacity = 1) { // from the main thread + (0 until iterations).forEach { + send(Leak(it)) + } + } + + launch(Dispatchers.Default) { + repeat (iterations) { + val value = channel.receiveBatch() + assertEquals(it, value.i) + } + (channel as Job).join() + + FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } + } + } +} |