aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVsevolod Tolstopyatov <qwwdfsad@gmail.com>2021-03-24 11:51:12 +0300
committerGitHub <noreply@github.com>2021-03-24 11:51:12 +0300
commite608dfbb950831a865da7580b0f9b2b0dcfe7667 (patch)
tree2159758ead917e9bcf852655d77ece23e7c0365a
parent5f9e52c5644976dfb9e2e31bde573682c06e6ff9 (diff)
downloadplatform_external_kotlinx.coroutines-e608dfbb950831a865da7580b0f9b2b0dcfe7667.tar.gz
platform_external_kotlinx.coroutines-e608dfbb950831a865da7580b0f9b2b0dcfe7667.tar.bz2
platform_external_kotlinx.coroutines-e608dfbb950831a865da7580b0f9b2b0dcfe7667.zip
Rework reusability control in cancellable continuation (#2581)
* Rework reusability control in cancellable continuation * Update initCancellability documentation and implementation to be aligned with current invariants * Make parentHandle non-volatile and ensure there are no races around it * Establish new reusability invariants - Reusable continuation can be used _only_ if it states is not REUSABLE_CLAIMED - If it is, spin-loop and wait for release - Now the parent is attached to reusable continuation only if it was suspended at least once. Otherwise, the state machine can return via fast-path and no one will be able to release intercepted continuation (-> detach from parent) - It implies that the parent is attached after trySuspend call and can be concurrently reused, this is where new invariant comes into play * Leverage the fact that it's non-atomic and do not check it for cancellation prematurely. It increases the performance of fast-path, but potentially affects rare cancellation cases Fixes #2564
-rw-r--r--kotlinx-coroutines-core/common/src/CancellableContinuation.kt8
-rw-r--r--kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt116
-rw-r--r--kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt7
-rw-r--r--kotlinx-coroutines-core/common/src/JobSupport.kt2
-rw-r--r--kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt51
-rw-r--r--kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/FieldWalker.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt41
8 files changed, 167 insertions, 66 deletions
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
index 8f589912..b133b793 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt
@@ -104,8 +104,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun completeResume(token: Any)
/**
- * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
- * This function does nothing and is left only for binary compatibility with old compiled code.
+ * Internal function that setups cancellation behavior in [suspendCancellableCoroutine].
+ * It's illegal to call this function in any non-`kotlinx.coroutines` code and
+ * such calls lead to undefined behaviour.
+ * Exposed in our ABI since 1.0.0 withing `suspendCancellableCoroutine` body.
*
* @suppress **This is unstable API and it is subject to change.**
*/
@@ -332,7 +334,7 @@ internal suspend inline fun <T> suspendCancellableCoroutineReusable(
internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
- return CancellableContinuationImpl(delegate, MODE_CANCELLABLE_REUSABLE)
+ return CancellableContinuationImpl(delegate, MODE_CANCELLABLE)
}
/*
* Attempt to claim reusable instance.
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
index 1a8f3566..c310623c 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
@@ -72,10 +72,7 @@ internal open class CancellableContinuationImpl<in T>(
*/
private val _state = atomic<Any?>(Active)
- private val _parentHandle = atomic<DisposableHandle?>(null)
- private var parentHandle: DisposableHandle?
- get() = _parentHandle.value
- set(value) { _parentHandle.value = value }
+ private var parentHandle: DisposableHandle? = null
internal val state: Any? get() = _state.value
@@ -93,7 +90,21 @@ internal open class CancellableContinuationImpl<in T>(
}
public override fun initCancellability() {
- setupCancellation()
+ /*
+ * Invariant: at the moment of invocation, `this` has not yet
+ * leaked to user code and no one is able to invoke `resume` or `cancel`
+ * on it yet. Also, this function is not invoked for reusable continuations.
+ */
+ val handle = installParentHandle()
+ ?: return // fast path -- don't do anything without parent
+ // now check our state _after_ registering, could have completed while we were registering,
+ // but only if parent was cancelled. Parent could be in a "cancelling" state for a while,
+ // so we are helping it and cleaning the node ourselves
+ if (isCompleted) {
+ // Can be invoked concurrently in 'parentCancelled', no problems here
+ handle.dispose()
+ parentHandle = NonDisposableHandle
+ }
}
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
@@ -118,40 +129,6 @@ internal open class CancellableContinuationImpl<in T>(
return true
}
- /**
- * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
- * It is only invoked from an internal [getResult] function for reusable continuations
- * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
- */
- private fun setupCancellation() {
- if (checkCompleted()) return
- if (parentHandle !== null) return // fast path 2 -- was already initialized
- val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
- val handle = parent.invokeOnCompletion(
- onCancelling = true,
- handler = ChildContinuation(this).asHandler
- )
- parentHandle = handle
- // now check our state _after_ registering (could have completed while we were registering)
- // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us
- if (isCompleted && !isReusable()) {
- handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle
- parentHandle = NonDisposableHandle // release it just in case, to aid GC
- }
- }
-
- private fun checkCompleted(): Boolean {
- val completed = isCompleted
- if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
- val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
- val cause = dispatched.checkPostponedCancellation(this) ?: return completed
- if (!completed) {
- // Note: this cancel may fail if one more concurrent cancel is currently being invoked
- cancel(cause)
- }
- return true
- }
-
public override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame
@@ -188,7 +165,9 @@ internal open class CancellableContinuationImpl<in T>(
*/
private fun cancelLater(cause: Throwable): Boolean {
if (!resumeMode.isReusableMode) return false
- val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
+ // Ensure that we are postponing cancellation to the right instance
+ if (!isReusable()) return false
+ val dispatched = delegate as DispatchedContinuation<*>
return dispatched.postponeCancellation(cause)
}
@@ -216,7 +195,7 @@ internal open class CancellableContinuationImpl<in T>(
private inline fun callCancelHandlerSafely(block: () -> Unit) {
try {
- block()
+ block()
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
@@ -276,9 +255,33 @@ internal open class CancellableContinuationImpl<in T>(
@PublishedApi
internal fun getResult(): Any? {
- setupCancellation()
- if (trySuspend()) return COROUTINE_SUSPENDED
+ val isReusable = isReusable()
+ // trySuspend may fail either if 'block' has resumed/cancelled a continuation
+ // or we got async cancellation from parent.
+ if (trySuspend()) {
+ /*
+ * We were neither resumed nor cancelled, time to suspend.
+ * But first we have to install parent cancellation handle (if we didn't yet),
+ * so CC could be properly resumed on parent cancellation.
+ */
+ if (parentHandle == null) {
+ installParentHandle()
+ }
+ /*
+ * Release the continuation after installing the handle (if needed).
+ * If we were successful, then do nothing, it's ok to reuse the instance now.
+ * Otherwise, dispose the handle by ourselves.
+ */
+ if (isReusable) {
+ releaseClaimedReusableContinuation()
+ }
+ return COROUTINE_SUSPENDED
+ }
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
+ if (isReusable) {
+ // release claimed reusable continuation for the future reuse
+ releaseClaimedReusableContinuation()
+ }
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
@@ -296,6 +299,28 @@ internal open class CancellableContinuationImpl<in T>(
return getSuccessfulResult(state)
}
+ private fun installParentHandle(): DisposableHandle? {
+ val parent = context[Job] ?: return null // don't do anything without a parent
+ // Install the handle
+ val handle = parent.invokeOnCompletion(
+ onCancelling = true,
+ handler = ChildContinuation(this).asHandler
+ )
+ parentHandle = handle
+ return handle
+ }
+
+ /**
+ * Tries to release reusable continuation. It can fail is there was an asynchronous cancellation,
+ * in which case it detaches from the parent and cancels this continuation.
+ */
+ private fun releaseClaimedReusableContinuation() {
+ // Cannot be casted if e.g. invoked from `installParentHandleReusable` for context without dispatchers, but with Job in it
+ val cancellationCause = (delegate as? DispatchedContinuation<*>)?.tryReleaseClaimedContinuation(this) ?: return
+ detachChild()
+ cancel(cancellationCause)
+ }
+
override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)
@@ -462,11 +487,10 @@ internal open class CancellableContinuationImpl<in T>(
/**
* Detaches from the parent.
- * Invariant: used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true`
*/
internal fun detachChild() {
- val handle = parentHandle
- handle?.dispose()
+ val handle = parentHandle ?: return
+ handle.dispose()
parentHandle = NonDisposableHandle
}
diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
index b2b88798..10be4a3d 100644
--- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
@@ -101,7 +101,12 @@ public abstract class CoroutineDispatcher :
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
- (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
+ /*
+ * Unconditional cast is safe here: we only return DispatchedContinuation from `interceptContinuation`,
+ * any ClassCastException can only indicate compiler bug
+ */
+ val dispatched = continuation as DispatchedContinuation<*>
+ dispatched.release()
}
/**
diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt
index a7dbbf8b..438e7f57 100644
--- a/kotlinx-coroutines-core/common/src/JobSupport.kt
+++ b/kotlinx-coroutines-core/common/src/JobSupport.kt
@@ -1228,6 +1228,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
+ // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
+ cont.initCancellability()
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
cont.getResult()
}
diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
index 2874e7d5..45b9699c 100644
--- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
+++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
@@ -46,17 +46,15 @@ internal class DispatchedContinuation<in T>(
* 4) [Throwable] continuation was cancelled with this cause while being in [suspendCancellableCoroutineReusable],
* [CancellableContinuationImpl.getResult] will check for cancellation later.
*
- * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel.
- * AbstractChannel.receive method relies on the fact that the following pattern
+ * [REUSABLE_CLAIMED] state is required to prevent double-use of the reused continuation.
+ * In the `getResult`, we have the following code:
* ```
- * suspendCancellableCoroutineReusable { cont ->
- * val result = pollFastPath()
- * if (result != null) cont.resume(result)
+ * if (trySuspend()) {
+ * // <- at this moment current continuation can be redispatched and claimed again.
+ * attachChildToParent()
+ * releaseClaimedContinuation()
* }
* ```
- * always succeeds.
- * To make it always successful, we actually postpone "reusable" cancellation
- * to this phase and set cancellation only at the moment of instantiation.
*/
private val _reusableCancellableContinuation = atomic<Any?>(null)
@@ -66,9 +64,9 @@ internal class DispatchedContinuation<in T>(
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
/*
* Reusability control:
- * `null` -> no reusability at all, false
+ * `null` -> no reusability at all, `false`
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
- * Else, if result is CCI === requester.
+ * Else, if result is CCI === requester, then it's our reusable continuation
* Identity check my fail for the following pattern:
* ```
* loop:
@@ -82,6 +80,27 @@ internal class DispatchedContinuation<in T>(
return true
}
+
+ /**
+ * Awaits until previous call to `suspendCancellableCoroutineReusable` will
+ * stop mutating cached instance
+ */
+ public fun awaitReusability() {
+ _reusableCancellableContinuation.loop { it ->
+ if (it !== REUSABLE_CLAIMED) return
+ }
+ }
+
+ public fun release() {
+ /*
+ * Called from `releaseInterceptedContinuation`, can be concurrent with
+ * the code in `getResult` right after `trySuspend` returned `true`, so we have
+ * to wait for a release here.
+ */
+ awaitReusability()
+ reusableCancellableContinuation?.detachChild()
+ }
+
/**
* Claims the continuation for [suspendCancellableCoroutineReusable] block,
* so all cancellations will be postponed.
@@ -103,11 +122,20 @@ internal class DispatchedContinuation<in T>(
_reusableCancellableContinuation.value = REUSABLE_CLAIMED
return null
}
+ // potentially competing with cancel
state is CancellableContinuationImpl<*> -> {
if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) {
return state as CancellableContinuationImpl<T>
}
}
+ state === REUSABLE_CLAIMED -> {
+ // Do nothing, wait until reusable instance will be returned from
+ // getResult() of a previous `suspendCancellableCoroutineReusable`
+ }
+ state is Throwable -> {
+ // Also do nothing, Throwable can only indicate that the CC
+ // is in REUSABLE_CLAIMED state, but with postponed cancellation
+ }
else -> error("Inconsistent state $state")
}
}
@@ -127,14 +155,13 @@ internal class DispatchedContinuation<in T>(
*
* See [CancellableContinuationImpl.getResult].
*/
- fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? {
+ fun tryReleaseClaimedContinuation(continuation: CancellableContinuation<*>): Throwable? {
_reusableCancellableContinuation.loop { state ->
// not when(state) to avoid Intrinsics.equals call
when {
state === REUSABLE_CLAIMED -> {
if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null
}
- state === null -> return null
state is Throwable -> {
require(_reusableCancellableContinuation.compareAndSet(state, null))
return state
diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
index 55c05c55..c7c2c04e 100644
--- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() {
private fun keepMe(a: ByteArray) {
// does nothing, makes sure the variable is kept in state-machine
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
index e8079ebd..c4232d6e 100644
--- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
+++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -56,7 +56,7 @@ object FieldWalker {
* Reflectively starts to walk through object graph and map to all the reached object to their path
* in from root. Use [showPath] do display a path if needed.
*/
- private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
+ private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
val visited = IdentityHashMap<Any, Ref>()
if (root == null) return visited
visited[root] = Ref.RootRef
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt
new file mode 100644
index 00000000..8a20e084
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.channels.*
+import org.junit.Test
+import kotlin.test.*
+
+class ReusableCancellableContinuationLeakStressTest : TestBase() {
+
+ @Suppress("UnnecessaryVariable")
+ private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
+ val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
+ return r
+ }
+
+ private val iterations = 100_000 * stressTestMultiplier
+
+ class Leak(val i: Int)
+
+ @Test // Simplified version of #2564
+ fun testReusableContinuationLeak() = runTest {
+ val channel = produce(capacity = 1) { // from the main thread
+ (0 until iterations).forEach {
+ send(Leak(it))
+ }
+ }
+
+ launch(Dispatchers.Default) {
+ repeat (iterations) {
+ val value = channel.receiveBatch()
+ assertEquals(it, value.i)
+ }
+ (channel as Job).join()
+
+ FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
+ }
+ }
+}