aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/src
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/jvm/src')
-rw-r--r--kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt (renamed from kotlinx-coroutines-core/jvm/src/TimeSource.kt)22
-rw-r--r--kotlinx-coroutines-core/jvm/src/Builders.kt3
-rw-r--r--kotlinx-coroutines-core/jvm/src/Dispatchers.kt7
-rw-r--r--kotlinx-coroutines-core/jvm/src/Executors.kt80
-rw-r--r--kotlinx-coroutines-core/jvm/src/Future.kt10
-rw-r--r--kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt41
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/Actor.kt12
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/Channels.kt79
-rw-r--r--kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt22
-rw-r--r--kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt19
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt5
-rw-r--r--kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt14
13 files changed, 216 insertions, 106 deletions
diff --git a/kotlinx-coroutines-core/jvm/src/TimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt
index 8d6dea2f..3f7ac675 100644
--- a/kotlinx-coroutines-core/jvm/src/TimeSource.kt
+++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt
@@ -10,21 +10,21 @@ package kotlinx.coroutines
import java.util.concurrent.locks.*
import kotlin.internal.InlineOnly
-internal interface TimeSource {
- fun currentTimeMillis(): Long
- fun nanoTime(): Long
- fun wrapTask(block: Runnable): Runnable
- fun trackTask()
- fun unTrackTask()
- fun registerTimeLoopThread()
- fun unregisterTimeLoopThread()
- fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0
- fun unpark(thread: Thread)
+internal abstract class AbstractTimeSource {
+ abstract fun currentTimeMillis(): Long
+ abstract fun nanoTime(): Long
+ abstract fun wrapTask(block: Runnable): Runnable
+ abstract fun trackTask()
+ abstract fun unTrackTask()
+ abstract fun registerTimeLoopThread()
+ abstract fun unregisterTimeLoopThread()
+ abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0
+ abstract fun unpark(thread: Thread)
}
// For tests only
// @JvmField: Don't use JvmField here to enable R8 optimizations via "assumenosideeffects"
-internal var timeSource: TimeSource? = null
+internal var timeSource: AbstractTimeSource? = null
@InlineOnly
internal inline fun currentTimeMillis(): Long =
diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt
index c1b878ce..edb43031 100644
--- a/kotlinx-coroutines-core/jvm/src/Builders.kt
+++ b/kotlinx-coroutines-core/jvm/src/Builders.kt
@@ -63,7 +63,8 @@ private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val blockedThread: Thread,
private val eventLoop: EventLoop?
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, true, true) {
+
override val isScopedCoroutine: Boolean get() = true
override fun afterCompletion(state: Any?) {
diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt
index 25c0fbe9..d82598ea 100644
--- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt
+++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt
@@ -107,9 +107,10 @@ public actual object Dispatchers {
*
* ### Implementation note
*
- * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using
- * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread &mdash;
- * typically execution continues in the same thread.
+ * This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using
+ * `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default]
+ * dispatcher does not lead to an actual switching to another thread &mdash; typically execution
+ * continues in the same thread.
* As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used)
* during operations over IO dispatcher.
*/
diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt
index 394304f2..7ea3cc68 100644
--- a/kotlinx-coroutines-core/jvm/src/Executors.kt
+++ b/kotlinx-coroutines-core/jvm/src/Executors.kt
@@ -4,6 +4,7 @@
package kotlinx.coroutines
+import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
@@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
+ * ## Interaction with [delay] and time-based coroutines.
+ *
+ * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
+ * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
+ * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
+ * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
+ *
+ * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
+ * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
+ * to reduce the memory pressure of cancelled coroutines.
+ *
+ * If the executor service is neither of this types, the separate internal thread will be used to
+ * _track_ the delay and time-related executions, but the coroutine itself will still be executed
+ * on top of the given executor.
+ *
+ * ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
@@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
+ * ## Interaction with [delay] and time-based coroutines.
+ *
+ * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
+ * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
+ * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
+ * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
+ *
+ * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
+ * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
+ * to reduce the memory pressure of cancelled coroutines.
+ *
+ * If the executor is neither of this types, the separate internal thread will be used to
+ * _track_ the delay and time-related executions, but the coroutine itself will still be executed
+ * on top of the given executor.
+ *
+ * ## Rejected execution
+ *
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
@@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)
override fun toString(): String = dispatcher.toString()
}
-private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
- init {
- initFutureCancellation()
- }
-}
+internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
-internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
-
- private var removesFutureOnCancellation: Boolean = false
-
- internal fun initFutureCancellation() {
- removesFutureOnCancellation = removeFutureOnCancel(executor)
+ /*
+ * Attempts to reflectively (to be Java 6 compatible) invoke
+ * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
+ * internal scheduler queue on cancellation.
+ */
+ init {
+ removeFutureOnCancel(executor)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
@@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
}
- /*
- * removesFutureOnCancellation is required to avoid memory leak.
- * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
- * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
- */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- val future = if (removesFutureOnCancellation) {
- scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
- } else {
- null
- }
+ val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
+ ResumeUndispatchedRunnable(this, continuation),
+ continuation.context,
+ timeMillis
+ )
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
@@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
- val future = if (removesFutureOnCancellation) {
- scheduleBlock(block, context, timeMillis)
- } else {
- null
- }
+ val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}
- private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
+ private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
- (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
+ schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
@@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
override fun toString(): String = executor.toString()
- override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
+ override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}
diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt
index 948ef606..b27a9708 100644
--- a/kotlinx-coroutines-core/jvm/src/Future.kt
+++ b/kotlinx-coroutines-core/jvm/src/Future.kt
@@ -13,20 +13,20 @@ import java.util.concurrent.*
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
- * invokeOnCompletion { future.cancel(false) }
+ * invokeOnCompletion { if (it != null) future.cancel(false) }
* ```
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
- invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well?
+ invokeOnCompletion(handler = CancelFutureOnCompletion(future))
/**
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
- * invokeOnCancellation { future.cancel(false) }
+ * invokeOnCancellation { if (it != null) future.cancel(false) }
* ```
*/
public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit =
@@ -38,7 +38,7 @@ private class CancelFutureOnCompletion(
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
- future.cancel(false)
+ if (cause != null) future.cancel(false)
}
}
@@ -46,7 +46,7 @@ private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandle
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
- future.cancel(false)
+ if (cause != null) future.cancel(false)
}
override fun toString() = "CancelFutureOnCancel[$future]"
}
diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
index 44a79d42..99e3b46c 100644
--- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
+++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
@@ -59,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
@ObsoleteCoroutinesApi
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
- return ThreadPoolDispatcher(nThreads, name)
-}
-
-internal class PoolThread(
- @JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
- target: Runnable, name: String
-) : Thread(target, name) {
- init { isDaemon = true }
-}
-
-/**
- * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
- * created with [newSingleThreadContext] and [newFixedThreadPoolContext].
- */
-internal class ThreadPoolDispatcher internal constructor(
- private val nThreads: Int,
- private val name: String
-) : ExecutorCoroutineDispatcherBase() {
- private val threadNo = AtomicInteger()
-
- override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
- PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
- }
-
- init {
- initFutureCancellation()
+ val threadNo = AtomicInteger()
+ val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
+ val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
+ t.isDaemon = true
+ t
}
-
- /**
- * Closes this dispatcher -- shuts down all threads in this pool and releases resources.
- */
- public override fun close() {
- (executor as ExecutorService).shutdown()
- }
-
- override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
+ return executor.asCoroutineDispatcher()
}
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
index 0212d740..96cda7b1 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
@@ -127,7 +127,11 @@ private open class ActorCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>,
active: Boolean
-) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> {
+) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> {
+
+ init {
+ initParentJob(parentContext[Job])
+ }
override fun onCancelling(cause: Throwable?) {
_channel.cancel(cause?.let {
@@ -159,11 +163,17 @@ private class LazyActorCoroutine<E>(
return super.send(element)
}
+ @Suppress("DEPRECATION_ERROR")
override fun offer(element: E): Boolean {
start()
return super.offer(element)
}
+ override fun trySend(element: E): ChannelResult<Unit> {
+ start()
+ return super.trySend(element)
+ }
+
override fun close(cause: Throwable?): Boolean {
// close the channel _first_
val closed = super.close(cause)
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
index 081a0583..0df8278b 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt
@@ -10,18 +10,87 @@ package kotlinx.coroutines.channels
import kotlinx.coroutines.*
/**
- * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
- * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
+ * **Deprecated** blocking variant of send.
+ * This method is deprecated in the favour of [trySendBlocking].
*
- * This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
- * so this function should not be used from coroutine.
+ * `sendBlocking` is a dangerous primitive &mdash; it throws an exception
+ * if the channel was closed or, more commonly, cancelled.
+ * Cancellation exceptions in non-blocking code are unexpected and frequently
+ * trigger internal failures.
+ *
+ * These bugs are hard-to-spot during code review and they forced users to write
+ * their own wrappers around `sendBlocking`.
+ * So this function is deprecated and replaced with a more explicit primitive.
+ *
+ * The real-world example of broken usage with Firebase:
+ *
+ * ```kotlin
+ * callbackFlow {
+ * val listener = object : ValueEventListener {
+ * override fun onDataChange(snapshot: DataSnapshot) {
+ * // This line may fail and crash the app when the downstream flow is cancelled
+ * sendBlocking(DataSnapshot(snapshot))
+ * }
+ *
+ * override fun onCancelled(error: DatabaseError) {
+ * close(error.toException())
+ * }
+ * }
+ *
+ * firebaseQuery.addValueEventListener(listener)
+ * awaitClose { firebaseQuery.removeEventListener(listener) }
+ * }
+ * ```
*/
+@Deprecated(
+ level = DeprecationLevel.WARNING,
+ message = "Deprecated in the favour of 'trySendBlocking'. " +
+ "Consider handling the result of 'trySendBlocking' explicitly and rethrow exception if necessary",
+ replaceWith = ReplaceWith("trySendBlocking(element)")
+)
public fun <E> SendChannel<E>.sendBlocking(element: E) {
// fast path
- if (offer(element))
+ if (trySend(element).isSuccess)
return
// slow path
runBlocking {
send(element)
}
}
+
+/**
+ * Adds [element] into to this channel, **blocking** the caller while this channel is full,
+ * and returning either [successful][ChannelResult.isSuccess] result when the element was added, or
+ * failed result representing closed channel with a corresponding exception.
+ *
+ * This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching,
+ * so this function should not be used from coroutine.
+ *
+ * Example of usage:
+ *
+ * ```
+ * // From callback API
+ * channel.trySendBlocking(element)
+ * .onSuccess { /* request next element or debug log */ }
+ * .onFailure { t: Throwable? -> /* throw or log */ }
+ * ```
+ *
+ * For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it.
+ *
+ * @throws [InterruptedException] if the current thread is interrupted during the blocking send operation.
+ */
+@Throws(InterruptedException::class)
+public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> {
+ /*
+ * Sent successfully -- bail out.
+ * But failure may indicate either that the channel it full or that
+ * it is close. Go to slow path on failure to simplify the successful path and
+ * to materialize default exception.
+ */
+ trySend(element).onSuccess { return ChannelResult.success(Unit) }
+ return runBlocking {
+ val r = runCatching { send(element) }
+ if (r.isSuccess) ChannelResult.success(Unit)
+ else ChannelResult.closed(r.exceptionOrNull())
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
index 099e70b3..6c23982e 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt
@@ -21,13 +21,13 @@ public enum class TickerMode {
* ```
* val channel = ticker(delay = 100)
* delay(350) // 250 ms late
- * println(channel.poll()) // prints Unit
- * println(channel.poll()) // prints null
+ * println(channel.tryReceive().getOrNull()) // prints Unit
+ * println(channel.tryReceive().getOrNull()) // prints null
*
* delay(50)
- * println(channel.poll()) // prints Unit, delay was adjusted
+ * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted
* delay(50)
- * println(channel.poll()) // prints null, we'are not late relatively to previous element
+ * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element
* ```
*/
FIXED_PERIOD,
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt
new file mode 100644
index 00000000..8dc5b7c2
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("unused")
+
+package kotlinx.coroutines.debug.internal
+
+import kotlin.coroutines.*
+
+/*
+ * This class is used by ByteBuddy from kotlinx-coroutines-debug as kotlin.coroutines.jvm.internal.DebugProbesKt replacement.
+ * In theory, it should belong to kotlinx-coroutines-debug, but placing it here significantly simplifies the
+ * Android AS debugger that does on-load DEX transformation
+ */
+
+// Stubs which are injected as coroutine probes. Require direct match of signatures
+internal fun probeCoroutineResumed(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineResumed(frame)
+
+internal fun probeCoroutineSuspended(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineSuspended(frame)
+internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> =
+ DebugProbesImpl.probeCoroutineCreated(completion)
diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
index 4c88cc97..05befc1a 100644
--- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
+++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt
@@ -282,7 +282,7 @@ internal object DebugProbesImpl {
it.fileName == "ContinuationImpl.kt"
}
- val (continuationStartFrame, frameSkipped) = findContinuationStartIndex(
+ val (continuationStartFrame, delta) = findContinuationStartIndex(
indexOfResumeWith,
actualTrace,
coroutineTrace
@@ -290,7 +290,6 @@ internal object DebugProbesImpl {
if (continuationStartFrame == -1) return coroutineTrace
- val delta = if (frameSkipped) 1 else 0
val expectedSize = indexOfResumeWith + coroutineTrace.size - continuationStartFrame - 1 - delta
val result = ArrayList<StackTraceElement>(expectedSize)
for (index in 0 until indexOfResumeWith - delta) {
@@ -312,16 +311,22 @@ internal object DebugProbesImpl {
* If method above `resumeWith` has no line number (thus it is `stateMachine.invokeSuspend`),
* it's skipped and attempt to match next one is made because state machine could have been missing in the original coroutine stacktrace.
*
- * Returns index of such frame (or -1) and flag indicating whether frame with state machine was skipped
+ * Returns index of such frame (or -1) and number of skipped frames (up to 2, for state machine and for access$).
*/
private fun findContinuationStartIndex(
indexOfResumeWith: Int,
actualTrace: Array<StackTraceElement>,
coroutineTrace: List<StackTraceElement>
- ): Pair<Int, Boolean> {
- val result = findIndexOfFrame(indexOfResumeWith - 1, actualTrace, coroutineTrace)
- if (result == -1) return findIndexOfFrame(indexOfResumeWith - 2, actualTrace, coroutineTrace) to true
- return result to false
+ ): Pair<Int, Int> {
+ /*
+ * Since Kotlin 1.5.0 we have these access$ methods that we have to skip.
+ * So we have to test next frame for invokeSuspend, for $access and for actual suspending call.
+ */
+ repeat(3) {
+ val result = findIndexOfFrame(indexOfResumeWith - 1 - it, actualTrace, coroutineTrace)
+ if (result != -1) return result to it
+ }
+ return -1 to 0
}
private fun findIndexOfFrame(
diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
index 3102fdfb..2d447413 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
@@ -67,7 +67,10 @@ public fun MainCoroutineDispatcher.isMissing(): Boolean = this is MissingMainCor
@Suppress("MayBeConstant")
private val SUPPORT_MISSING = true
-@Suppress("ConstantConditionIf")
+@Suppress(
+ "ConstantConditionIf",
+ "IMPLICIT_NOTHING_TYPE_ARGUMENT_AGAINST_NOT_NOTHING_EXPECTED_TYPE" // KT-47626
+)
private fun createMissingDispatcher(cause: Throwable? = null, errorHint: String? = null) =
if (SUPPORT_MISSING) MissingMainCoroutineDispatcher(cause, errorHint) else
cause?.let { throw it } ?: throwMissingMainDispatcherException()
diff --git a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
index 48e8790c..174c57b7 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt
@@ -29,7 +29,7 @@ private val stackTraceRecoveryClassName = runCatching {
internal actual fun <E : Throwable> recoverStackTrace(exception: E): E {
if (!RECOVER_STACK_TRACES) return exception
// No unwrapping on continuation-less path: exception is not reported multiple times via slow paths
- val copy = tryCopyException(exception) ?: return exception
+ val copy = tryCopyAndVerify(exception) ?: return exception
return copy.sanitizeStackTrace()
}
@@ -66,9 +66,7 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co
val (cause, recoveredStacktrace) = exception.causeAndStacktrace()
// Try to create an exception of the same type and get stacktrace from continuation
- val newException = tryCopyException(cause) ?: return exception
- // Verify that the new exception has the same message as the original one (bail out if not, see #1631)
- if (newException.message != cause.message) return exception
+ val newException = tryCopyAndVerify(cause) ?: return exception
// Update stacktrace
val stacktrace = createStackTrace(continuation)
if (stacktrace.isEmpty()) return exception
@@ -80,6 +78,14 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co
return createFinalException(cause, newException, stacktrace)
}
+private fun <E : Throwable> tryCopyAndVerify(exception: E): E? {
+ val newException = tryCopyException(exception) ?: return null
+ // Verify that the new exception has the same message as the original one (bail out if not, see #1631)
+ // CopyableThrowable has control over its message and thus can modify it the way it wants
+ if (exception !is CopyableThrowable<*> && newException.message != exception.message) return null
+ return newException
+}
+
/*
* Here we partially copy original exception stackTrace to make current one much prettier.
* E.g. for