diff options
Diffstat (limited to 'kotlinx-coroutines-core/jvm/src')
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt (renamed from kotlinx-coroutines-core/jvm/src/TimeSource.kt) | 22 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/Builders.kt | 3 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/Dispatchers.kt | 7 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/Executors.kt | 80 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/Future.kt | 10 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt | 41 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/channels/Actor.kt | 12 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/channels/Channels.kt | 79 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt | 8 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt | 22 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt | 19 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt | 5 | ||||
-rw-r--r-- | kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt | 14 |
13 files changed, 216 insertions, 106 deletions
diff --git a/kotlinx-coroutines-core/jvm/src/TimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index 8d6dea2f..3f7ac675 100644 --- a/kotlinx-coroutines-core/jvm/src/TimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -10,21 +10,21 @@ package kotlinx.coroutines import java.util.concurrent.locks.* import kotlin.internal.InlineOnly -internal interface TimeSource { - fun currentTimeMillis(): Long - fun nanoTime(): Long - fun wrapTask(block: Runnable): Runnable - fun trackTask() - fun unTrackTask() - fun registerTimeLoopThread() - fun unregisterTimeLoopThread() - fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 - fun unpark(thread: Thread) +internal abstract class AbstractTimeSource { + abstract fun currentTimeMillis(): Long + abstract fun nanoTime(): Long + abstract fun wrapTask(block: Runnable): Runnable + abstract fun trackTask() + abstract fun unTrackTask() + abstract fun registerTimeLoopThread() + abstract fun unregisterTimeLoopThread() + abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 + abstract fun unpark(thread: Thread) } // For tests only // @JvmField: Don't use JvmField here to enable R8 optimizations via "assumenosideeffects" -internal var timeSource: TimeSource? = null +internal var timeSource: AbstractTimeSource? = null @InlineOnly internal inline fun currentTimeMillis(): Long = diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index c1b878ce..edb43031 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -63,7 +63,8 @@ private class BlockingCoroutine<T>( parentContext: CoroutineContext, private val blockedThread: Thread, private val eventLoop: EventLoop? -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, true, true) { + override val isScopedCoroutine: Boolean get() = true override fun afterCompletion(state: Any?) { diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index 25c0fbe9..d82598ea 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -107,9 +107,10 @@ public actual object Dispatchers { * * ### Implementation note * - * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using - * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — - * typically execution continues in the same thread. + * This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using + * `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default] + * dispatcher does not lead to an actual switching to another thread — typically execution + * continues in the same thread. * As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used) * during operations over IO dispatcher. */ diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 394304f2..7ea3cc68 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor service is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution + * * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) override fun toString(): String = dispatcher.toString() } -private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() { - init { - initFutureCancellation() - } -} +internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { -internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay { - - private var removesFutureOnCancellation: Boolean = false - - internal fun initFutureCancellation() { - removesFutureOnCancellation = removeFutureOnCancel(executor) + /* + * Attempts to reflectively (to be Java 6 compatible) invoke + * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup + * internal scheduler queue on cancellation. + */ + init { + removeFutureOnCancel(executor) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } } - /* - * removesFutureOnCancellation is required to avoid memory leak. - * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine. - * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation. - */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock( + ResumeUndispatchedRunnable(this, continuation), + continuation.context, + timeMillis + ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { continuation.cancelFutureOnCancellation(future) @@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = if (removesFutureOnCancellation) { - scheduleBlock(block, context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) } } - private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null @@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun toString(): String = executor.toString() - override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor + override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor override fun hashCode(): Int = System.identityHashCode(executor) } diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index 948ef606..b27a9708 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -13,20 +13,20 @@ import java.util.concurrent.* * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCompletion { future.cancel(false) } + * invokeOnCompletion { if (it != null) future.cancel(false) } * ``` * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle = - invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well? + invokeOnCompletion(handler = CancelFutureOnCompletion(future)) /** * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCancellation { future.cancel(false) } + * invokeOnCancellation { if (it != null) future.cancel(false) } * ``` */ public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit = @@ -38,7 +38,7 @@ private class CancelFutureOnCompletion( override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } } @@ -46,7 +46,7 @@ private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandle override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } override fun toString() = "CancelFutureOnCancel[$future]" } diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt index 44a79d42..99e3b46c 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt @@ -59,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = @ObsoleteCoroutinesApi public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher { require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" } - return ThreadPoolDispatcher(nThreads, name) -} - -internal class PoolThread( - @JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests - target: Runnable, name: String -) : Thread(target, name) { - init { isDaemon = true } -} - -/** - * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are - * created with [newSingleThreadContext] and [newFixedThreadPoolContext]. - */ -internal class ThreadPoolDispatcher internal constructor( - private val nThreads: Int, - private val name: String -) : ExecutorCoroutineDispatcherBase() { - private val threadNo = AtomicInteger() - - override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target -> - PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) - } - - init { - initFutureCancellation() + val threadNo = AtomicInteger() + val executor = Executors.newScheduledThreadPool(nThreads) { runnable -> + val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) + t.isDaemon = true + t } - - /** - * Closes this dispatcher -- shuts down all threads in this pool and releases resources. - */ - public override fun close() { - (executor as ExecutorService).shutdown() - } - - override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" + return executor.asCoroutineDispatcher() } diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt index 0212d740..96cda7b1 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt @@ -127,7 +127,11 @@ private open class ActorCoroutine<E>( parentContext: CoroutineContext, channel: Channel<E>, active: Boolean -) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> { +) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> { + + init { + initParentJob(parentContext[Job]) + } override fun onCancelling(cause: Throwable?) { _channel.cancel(cause?.let { @@ -159,11 +163,17 @@ private class LazyActorCoroutine<E>( return super.send(element) } + @Suppress("DEPRECATION_ERROR") override fun offer(element: E): Boolean { start() return super.offer(element) } + override fun trySend(element: E): ChannelResult<Unit> { + start() + return super.trySend(element) + } + override fun close(cause: Throwable?): Boolean { // close the channel _first_ val closed = super.close(cause) diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt index 081a0583..0df8278b 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt @@ -10,18 +10,87 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* /** - * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull], - * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details). + * **Deprecated** blocking variant of send. + * This method is deprecated in the favour of [trySendBlocking]. * - * This is a way to call [Channel.send] method inside a blocking code using [runBlocking], - * so this function should not be used from coroutine. + * `sendBlocking` is a dangerous primitive — it throws an exception + * if the channel was closed or, more commonly, cancelled. + * Cancellation exceptions in non-blocking code are unexpected and frequently + * trigger internal failures. + * + * These bugs are hard-to-spot during code review and they forced users to write + * their own wrappers around `sendBlocking`. + * So this function is deprecated and replaced with a more explicit primitive. + * + * The real-world example of broken usage with Firebase: + * + * ```kotlin + * callbackFlow { + * val listener = object : ValueEventListener { + * override fun onDataChange(snapshot: DataSnapshot) { + * // This line may fail and crash the app when the downstream flow is cancelled + * sendBlocking(DataSnapshot(snapshot)) + * } + * + * override fun onCancelled(error: DatabaseError) { + * close(error.toException()) + * } + * } + * + * firebaseQuery.addValueEventListener(listener) + * awaitClose { firebaseQuery.removeEventListener(listener) } + * } + * ``` */ +@Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'trySendBlocking'. " + + "Consider handling the result of 'trySendBlocking' explicitly and rethrow exception if necessary", + replaceWith = ReplaceWith("trySendBlocking(element)") +) public fun <E> SendChannel<E>.sendBlocking(element: E) { // fast path - if (offer(element)) + if (trySend(element).isSuccess) return // slow path runBlocking { send(element) } } + +/** + * Adds [element] into to this channel, **blocking** the caller while this channel is full, + * and returning either [successful][ChannelResult.isSuccess] result when the element was added, or + * failed result representing closed channel with a corresponding exception. + * + * This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching, + * so this function should not be used from coroutine. + * + * Example of usage: + * + * ``` + * // From callback API + * channel.trySendBlocking(element) + * .onSuccess { /* request next element or debug log */ } + * .onFailure { t: Throwable? -> /* throw or log */ } + * ``` + * + * For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it. + * + * @throws [InterruptedException] if the current thread is interrupted during the blocking send operation. + */ +@Throws(InterruptedException::class) +public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> { + /* + * Sent successfully -- bail out. + * But failure may indicate either that the channel it full or that + * it is close. Go to slow path on failure to simplify the successful path and + * to materialize default exception. + */ + trySend(element).onSuccess { return ChannelResult.success(Unit) } + return runBlocking { + val r = runCatching { send(element) } + if (r.isSuccess) ChannelResult.success(Unit) + else ChannelResult.closed(r.exceptionOrNull()) + } +} diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt index 099e70b3..6c23982e 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt @@ -21,13 +21,13 @@ public enum class TickerMode { * ``` * val channel = ticker(delay = 100) * delay(350) // 250 ms late - * println(channel.poll()) // prints Unit - * println(channel.poll()) // prints null + * println(channel.tryReceive().getOrNull()) // prints Unit + * println(channel.tryReceive().getOrNull()) // prints null * * delay(50) - * println(channel.poll()) // prints Unit, delay was adjusted + * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted * delay(50) - * println(channel.poll()) // prints null, we'are not late relatively to previous element + * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element * ``` */ FIXED_PERIOD, diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt new file mode 100644 index 00000000..8dc5b7c2 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("unused") + +package kotlinx.coroutines.debug.internal + +import kotlin.coroutines.* + +/* + * This class is used by ByteBuddy from kotlinx-coroutines-debug as kotlin.coroutines.jvm.internal.DebugProbesKt replacement. + * In theory, it should belong to kotlinx-coroutines-debug, but placing it here significantly simplifies the + * Android AS debugger that does on-load DEX transformation + */ + +// Stubs which are injected as coroutine probes. Require direct match of signatures +internal fun probeCoroutineResumed(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineResumed(frame) + +internal fun probeCoroutineSuspended(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineSuspended(frame) +internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> = + DebugProbesImpl.probeCoroutineCreated(completion) diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt index 4c88cc97..05befc1a 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt @@ -282,7 +282,7 @@ internal object DebugProbesImpl { it.fileName == "ContinuationImpl.kt" } - val (continuationStartFrame, frameSkipped) = findContinuationStartIndex( + val (continuationStartFrame, delta) = findContinuationStartIndex( indexOfResumeWith, actualTrace, coroutineTrace @@ -290,7 +290,6 @@ internal object DebugProbesImpl { if (continuationStartFrame == -1) return coroutineTrace - val delta = if (frameSkipped) 1 else 0 val expectedSize = indexOfResumeWith + coroutineTrace.size - continuationStartFrame - 1 - delta val result = ArrayList<StackTraceElement>(expectedSize) for (index in 0 until indexOfResumeWith - delta) { @@ -312,16 +311,22 @@ internal object DebugProbesImpl { * If method above `resumeWith` has no line number (thus it is `stateMachine.invokeSuspend`), * it's skipped and attempt to match next one is made because state machine could have been missing in the original coroutine stacktrace. * - * Returns index of such frame (or -1) and flag indicating whether frame with state machine was skipped + * Returns index of such frame (or -1) and number of skipped frames (up to 2, for state machine and for access$). */ private fun findContinuationStartIndex( indexOfResumeWith: Int, actualTrace: Array<StackTraceElement>, coroutineTrace: List<StackTraceElement> - ): Pair<Int, Boolean> { - val result = findIndexOfFrame(indexOfResumeWith - 1, actualTrace, coroutineTrace) - if (result == -1) return findIndexOfFrame(indexOfResumeWith - 2, actualTrace, coroutineTrace) to true - return result to false + ): Pair<Int, Int> { + /* + * Since Kotlin 1.5.0 we have these access$ methods that we have to skip. + * So we have to test next frame for invokeSuspend, for $access and for actual suspending call. + */ + repeat(3) { + val result = findIndexOfFrame(indexOfResumeWith - 1 - it, actualTrace, coroutineTrace) + if (result != -1) return result to it + } + return -1 to 0 } private fun findIndexOfFrame( diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 3102fdfb..2d447413 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -67,7 +67,10 @@ public fun MainCoroutineDispatcher.isMissing(): Boolean = this is MissingMainCor @Suppress("MayBeConstant") private val SUPPORT_MISSING = true -@Suppress("ConstantConditionIf") +@Suppress( + "ConstantConditionIf", + "IMPLICIT_NOTHING_TYPE_ARGUMENT_AGAINST_NOT_NOTHING_EXPECTED_TYPE" // KT-47626 +) private fun createMissingDispatcher(cause: Throwable? = null, errorHint: String? = null) = if (SUPPORT_MISSING) MissingMainCoroutineDispatcher(cause, errorHint) else cause?.let { throw it } ?: throwMissingMainDispatcherException() diff --git a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt index 48e8790c..174c57b7 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt @@ -29,7 +29,7 @@ private val stackTraceRecoveryClassName = runCatching { internal actual fun <E : Throwable> recoverStackTrace(exception: E): E { if (!RECOVER_STACK_TRACES) return exception // No unwrapping on continuation-less path: exception is not reported multiple times via slow paths - val copy = tryCopyException(exception) ?: return exception + val copy = tryCopyAndVerify(exception) ?: return exception return copy.sanitizeStackTrace() } @@ -66,9 +66,7 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co val (cause, recoveredStacktrace) = exception.causeAndStacktrace() // Try to create an exception of the same type and get stacktrace from continuation - val newException = tryCopyException(cause) ?: return exception - // Verify that the new exception has the same message as the original one (bail out if not, see #1631) - if (newException.message != cause.message) return exception + val newException = tryCopyAndVerify(cause) ?: return exception // Update stacktrace val stacktrace = createStackTrace(continuation) if (stacktrace.isEmpty()) return exception @@ -80,6 +78,14 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co return createFinalException(cause, newException, stacktrace) } +private fun <E : Throwable> tryCopyAndVerify(exception: E): E? { + val newException = tryCopyException(exception) ?: return null + // Verify that the new exception has the same message as the original one (bail out if not, see #1631) + // CopyableThrowable has control over its message and thus can modify it the way it wants + if (exception !is CopyableThrowable<*> && newException.message != exception.message) return null + return newException +} + /* * Here we partially copy original exception stackTrace to make current one much prettier. * E.g. for |