diff options
Diffstat (limited to 'kotlinx-coroutines-core/jvm')
79 files changed, 910 insertions, 468 deletions
diff --git a/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin b/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin Binary files differindex ea8c9da4..397aaf67 100644 --- a/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin +++ b/kotlinx-coroutines-core/jvm/resources/DebugProbesKt.bin diff --git a/kotlinx-coroutines-core/jvm/src/TimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index 8d6dea2f..3f7ac675 100644 --- a/kotlinx-coroutines-core/jvm/src/TimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -10,21 +10,21 @@ package kotlinx.coroutines import java.util.concurrent.locks.* import kotlin.internal.InlineOnly -internal interface TimeSource { - fun currentTimeMillis(): Long - fun nanoTime(): Long - fun wrapTask(block: Runnable): Runnable - fun trackTask() - fun unTrackTask() - fun registerTimeLoopThread() - fun unregisterTimeLoopThread() - fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 - fun unpark(thread: Thread) +internal abstract class AbstractTimeSource { + abstract fun currentTimeMillis(): Long + abstract fun nanoTime(): Long + abstract fun wrapTask(block: Runnable): Runnable + abstract fun trackTask() + abstract fun unTrackTask() + abstract fun registerTimeLoopThread() + abstract fun unregisterTimeLoopThread() + abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 + abstract fun unpark(thread: Thread) } // For tests only // @JvmField: Don't use JvmField here to enable R8 optimizations via "assumenosideeffects" -internal var timeSource: TimeSource? = null +internal var timeSource: AbstractTimeSource? = null @InlineOnly internal inline fun currentTimeMillis(): Long = diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index c1b878ce..edb43031 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -63,7 +63,8 @@ private class BlockingCoroutine<T>( parentContext: CoroutineContext, private val blockedThread: Thread, private val eventLoop: EventLoop? -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, true, true) { + override val isScopedCoroutine: Boolean get() = true override fun afterCompletion(state: Any?) { diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index 25c0fbe9..d82598ea 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -107,9 +107,10 @@ public actual object Dispatchers { * * ### Implementation note * - * This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using - * `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread — - * typically execution continues in the same thread. + * This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using + * `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default] + * dispatcher does not lead to an actual switching to another thread — typically execution + * continues in the same thread. * As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used) * during operations over IO dispatcher. */ diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 394304f2..7ea3cc68 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor service is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution + * * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) override fun toString(): String = dispatcher.toString() } -private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() { - init { - initFutureCancellation() - } -} +internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { -internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay { - - private var removesFutureOnCancellation: Boolean = false - - internal fun initFutureCancellation() { - removesFutureOnCancellation = removeFutureOnCancel(executor) + /* + * Attempts to reflectively (to be Java 6 compatible) invoke + * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup + * internal scheduler queue on cancellation. + */ + init { + removeFutureOnCancel(executor) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } } - /* - * removesFutureOnCancellation is required to avoid memory leak. - * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine. - * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation. - */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { - val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock( + ResumeUndispatchedRunnable(this, continuation), + continuation.context, + timeMillis + ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { continuation.cancelFutureOnCancellation(future) @@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = if (removesFutureOnCancellation) { - scheduleBlock(block, context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) } } - private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null @@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun toString(): String = executor.toString() - override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor + override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor override fun hashCode(): Int = System.identityHashCode(executor) } diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index 948ef606..b27a9708 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -13,20 +13,20 @@ import java.util.concurrent.* * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCompletion { future.cancel(false) } + * invokeOnCompletion { if (it != null) future.cancel(false) } * ``` * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle = - invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well? + invokeOnCompletion(handler = CancelFutureOnCompletion(future)) /** * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCancellation { future.cancel(false) } + * invokeOnCancellation { if (it != null) future.cancel(false) } * ``` */ public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit = @@ -38,7 +38,7 @@ private class CancelFutureOnCompletion( override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } } @@ -46,7 +46,7 @@ private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandle override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } override fun toString() = "CancelFutureOnCancel[$future]" } diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt index 44a79d42..99e3b46c 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt @@ -59,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = @ObsoleteCoroutinesApi public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher { require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" } - return ThreadPoolDispatcher(nThreads, name) -} - -internal class PoolThread( - @JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests - target: Runnable, name: String -) : Thread(target, name) { - init { isDaemon = true } -} - -/** - * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are - * created with [newSingleThreadContext] and [newFixedThreadPoolContext]. - */ -internal class ThreadPoolDispatcher internal constructor( - private val nThreads: Int, - private val name: String -) : ExecutorCoroutineDispatcherBase() { - private val threadNo = AtomicInteger() - - override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target -> - PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) - } - - init { - initFutureCancellation() + val threadNo = AtomicInteger() + val executor = Executors.newScheduledThreadPool(nThreads) { runnable -> + val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) + t.isDaemon = true + t } - - /** - * Closes this dispatcher -- shuts down all threads in this pool and releases resources. - */ - public override fun close() { - (executor as ExecutorService).shutdown() - } - - override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" + return executor.asCoroutineDispatcher() } diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt index 0212d740..96cda7b1 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt @@ -127,7 +127,11 @@ private open class ActorCoroutine<E>( parentContext: CoroutineContext, channel: Channel<E>, active: Boolean -) : ChannelCoroutine<E>(parentContext, channel, active), ActorScope<E> { +) : ChannelCoroutine<E>(parentContext, channel, initParentJob = false, active = active), ActorScope<E> { + + init { + initParentJob(parentContext[Job]) + } override fun onCancelling(cause: Throwable?) { _channel.cancel(cause?.let { @@ -159,11 +163,17 @@ private class LazyActorCoroutine<E>( return super.send(element) } + @Suppress("DEPRECATION_ERROR") override fun offer(element: E): Boolean { start() return super.offer(element) } + override fun trySend(element: E): ChannelResult<Unit> { + start() + return super.trySend(element) + } + override fun close(cause: Throwable?): Boolean { // close the channel _first_ val closed = super.close(cause) diff --git a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt index 081a0583..0df8278b 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Channels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Channels.kt @@ -10,18 +10,87 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* /** - * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull], - * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details). + * **Deprecated** blocking variant of send. + * This method is deprecated in the favour of [trySendBlocking]. * - * This is a way to call [Channel.send] method inside a blocking code using [runBlocking], - * so this function should not be used from coroutine. + * `sendBlocking` is a dangerous primitive — it throws an exception + * if the channel was closed or, more commonly, cancelled. + * Cancellation exceptions in non-blocking code are unexpected and frequently + * trigger internal failures. + * + * These bugs are hard-to-spot during code review and they forced users to write + * their own wrappers around `sendBlocking`. + * So this function is deprecated and replaced with a more explicit primitive. + * + * The real-world example of broken usage with Firebase: + * + * ```kotlin + * callbackFlow { + * val listener = object : ValueEventListener { + * override fun onDataChange(snapshot: DataSnapshot) { + * // This line may fail and crash the app when the downstream flow is cancelled + * sendBlocking(DataSnapshot(snapshot)) + * } + * + * override fun onCancelled(error: DatabaseError) { + * close(error.toException()) + * } + * } + * + * firebaseQuery.addValueEventListener(listener) + * awaitClose { firebaseQuery.removeEventListener(listener) } + * } + * ``` */ +@Deprecated( + level = DeprecationLevel.WARNING, + message = "Deprecated in the favour of 'trySendBlocking'. " + + "Consider handling the result of 'trySendBlocking' explicitly and rethrow exception if necessary", + replaceWith = ReplaceWith("trySendBlocking(element)") +) public fun <E> SendChannel<E>.sendBlocking(element: E) { // fast path - if (offer(element)) + if (trySend(element).isSuccess) return // slow path runBlocking { send(element) } } + +/** + * Adds [element] into to this channel, **blocking** the caller while this channel is full, + * and returning either [successful][ChannelResult.isSuccess] result when the element was added, or + * failed result representing closed channel with a corresponding exception. + * + * This is a way to call [Channel.send] method in a safe manner inside a blocking code using [runBlocking] and catching, + * so this function should not be used from coroutine. + * + * Example of usage: + * + * ``` + * // From callback API + * channel.trySendBlocking(element) + * .onSuccess { /* request next element or debug log */ } + * .onFailure { t: Throwable? -> /* throw or log */ } + * ``` + * + * For this operation it is guaranteed that [failure][ChannelResult.failed] always contains an exception in it. + * + * @throws [InterruptedException] if the current thread is interrupted during the blocking send operation. + */ +@Throws(InterruptedException::class) +public fun <E> SendChannel<E>.trySendBlocking(element: E): ChannelResult<Unit> { + /* + * Sent successfully -- bail out. + * But failure may indicate either that the channel it full or that + * it is close. Go to slow path on failure to simplify the successful path and + * to materialize default exception. + */ + trySend(element).onSuccess { return ChannelResult.success(Unit) } + return runBlocking { + val r = runCatching { send(element) } + if (r.isSuccess) ChannelResult.success(Unit) + else ChannelResult.closed(r.exceptionOrNull()) + } +} diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt index 099e70b3..6c23982e 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt @@ -21,13 +21,13 @@ public enum class TickerMode { * ``` * val channel = ticker(delay = 100) * delay(350) // 250 ms late - * println(channel.poll()) // prints Unit - * println(channel.poll()) // prints null + * println(channel.tryReceive().getOrNull()) // prints Unit + * println(channel.tryReceive().getOrNull()) // prints null * * delay(50) - * println(channel.poll()) // prints Unit, delay was adjusted + * println(channel.tryReceive().getOrNull()) // prints Unit, delay was adjusted * delay(50) - * println(channel.poll()) // prints null, we'are not late relatively to previous element + * println(channel.tryReceive().getOrNull()) // prints null, we're not late relatively to previous element * ``` */ FIXED_PERIOD, diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt new file mode 100644 index 00000000..8dc5b7c2 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbes.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("unused") + +package kotlinx.coroutines.debug.internal + +import kotlin.coroutines.* + +/* + * This class is used by ByteBuddy from kotlinx-coroutines-debug as kotlin.coroutines.jvm.internal.DebugProbesKt replacement. + * In theory, it should belong to kotlinx-coroutines-debug, but placing it here significantly simplifies the + * Android AS debugger that does on-load DEX transformation + */ + +// Stubs which are injected as coroutine probes. Require direct match of signatures +internal fun probeCoroutineResumed(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineResumed(frame) + +internal fun probeCoroutineSuspended(frame: Continuation<*>) = DebugProbesImpl.probeCoroutineSuspended(frame) +internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> = + DebugProbesImpl.probeCoroutineCreated(completion) diff --git a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt index 4c88cc97..05befc1a 100644 --- a/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt +++ b/kotlinx-coroutines-core/jvm/src/debug/internal/DebugProbesImpl.kt @@ -282,7 +282,7 @@ internal object DebugProbesImpl { it.fileName == "ContinuationImpl.kt" } - val (continuationStartFrame, frameSkipped) = findContinuationStartIndex( + val (continuationStartFrame, delta) = findContinuationStartIndex( indexOfResumeWith, actualTrace, coroutineTrace @@ -290,7 +290,6 @@ internal object DebugProbesImpl { if (continuationStartFrame == -1) return coroutineTrace - val delta = if (frameSkipped) 1 else 0 val expectedSize = indexOfResumeWith + coroutineTrace.size - continuationStartFrame - 1 - delta val result = ArrayList<StackTraceElement>(expectedSize) for (index in 0 until indexOfResumeWith - delta) { @@ -312,16 +311,22 @@ internal object DebugProbesImpl { * If method above `resumeWith` has no line number (thus it is `stateMachine.invokeSuspend`), * it's skipped and attempt to match next one is made because state machine could have been missing in the original coroutine stacktrace. * - * Returns index of such frame (or -1) and flag indicating whether frame with state machine was skipped + * Returns index of such frame (or -1) and number of skipped frames (up to 2, for state machine and for access$). */ private fun findContinuationStartIndex( indexOfResumeWith: Int, actualTrace: Array<StackTraceElement>, coroutineTrace: List<StackTraceElement> - ): Pair<Int, Boolean> { - val result = findIndexOfFrame(indexOfResumeWith - 1, actualTrace, coroutineTrace) - if (result == -1) return findIndexOfFrame(indexOfResumeWith - 2, actualTrace, coroutineTrace) to true - return result to false + ): Pair<Int, Int> { + /* + * Since Kotlin 1.5.0 we have these access$ methods that we have to skip. + * So we have to test next frame for invokeSuspend, for $access and for actual suspending call. + */ + repeat(3) { + val result = findIndexOfFrame(indexOfResumeWith - 1 - it, actualTrace, coroutineTrace) + if (result != -1) return result to it + } + return -1 to 0 } private fun findIndexOfFrame( diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 3102fdfb..2d447413 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -67,7 +67,10 @@ public fun MainCoroutineDispatcher.isMissing(): Boolean = this is MissingMainCor @Suppress("MayBeConstant") private val SUPPORT_MISSING = true -@Suppress("ConstantConditionIf") +@Suppress( + "ConstantConditionIf", + "IMPLICIT_NOTHING_TYPE_ARGUMENT_AGAINST_NOT_NOTHING_EXPECTED_TYPE" // KT-47626 +) private fun createMissingDispatcher(cause: Throwable? = null, errorHint: String? = null) = if (SUPPORT_MISSING) MissingMainCoroutineDispatcher(cause, errorHint) else cause?.let { throw it } ?: throwMissingMainDispatcherException() diff --git a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt index 48e8790c..174c57b7 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/StackTraceRecovery.kt @@ -29,7 +29,7 @@ private val stackTraceRecoveryClassName = runCatching { internal actual fun <E : Throwable> recoverStackTrace(exception: E): E { if (!RECOVER_STACK_TRACES) return exception // No unwrapping on continuation-less path: exception is not reported multiple times via slow paths - val copy = tryCopyException(exception) ?: return exception + val copy = tryCopyAndVerify(exception) ?: return exception return copy.sanitizeStackTrace() } @@ -66,9 +66,7 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co val (cause, recoveredStacktrace) = exception.causeAndStacktrace() // Try to create an exception of the same type and get stacktrace from continuation - val newException = tryCopyException(cause) ?: return exception - // Verify that the new exception has the same message as the original one (bail out if not, see #1631) - if (newException.message != cause.message) return exception + val newException = tryCopyAndVerify(cause) ?: return exception // Update stacktrace val stacktrace = createStackTrace(continuation) if (stacktrace.isEmpty()) return exception @@ -80,6 +78,14 @@ private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: Co return createFinalException(cause, newException, stacktrace) } +private fun <E : Throwable> tryCopyAndVerify(exception: E): E? { + val newException = tryCopyException(exception) ?: return null + // Verify that the new exception has the same message as the original one (bail out if not, see #1631) + // CopyableThrowable has control over its message and thus can modify it the way it wants + if (exception !is CopyableThrowable<*> && newException.message != exception.message) return null + return newException +} + /* * Here we partially copy original exception stackTrace to make current one much prettier. * E.g. for diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt deleted file mode 100644 index cfed5af4..00000000 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testCancelledOffer.txt +++ /dev/null @@ -1,10 +0,0 @@ -kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350 - (Coroutine boundary) - at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:170) - at kotlinx.coroutines.channels.ChannelCoroutine.offer(ChannelCoroutine.kt) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:153) -Caused by: kotlinx.coroutines.JobCancellationException: Job was cancelled; job=JobImpl{Cancelling}@2a06d350 - at kotlinx.coroutines.JobSupport.cancel(JobSupport.kt:599) - at kotlinx.coroutines.Job$DefaultImpls.cancel$default(Job.kt:164) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testCancelledOffer$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:151) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt deleted file mode 100644 index ac8f5f4e..00000000 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt +++ /dev/null @@ -1,8 +0,0 @@ -kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43) - (Coroutine boundary) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest.channelReceiveOrNull(StackTraceRecoveryChannelsTest.kt:70) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:44) -Caused by: kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
\ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt index 2d480861..3bfd08e5 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/select/testSelectJoin.txt @@ -1,7 +1,7 @@ kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$$inlined$select$lambda$1.invokeSuspend(StackTraceRecoverySelectTest.kt:33) + at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$2$1.invokeSuspend(StackTraceRecoverySelectTest.kt) (Coroutine boundary) - at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$testSelectJoin$1.invokeSuspend(StackTraceRecoverySelectTest.kt:20) + at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$testSelectJoin$1.invokeSuspend(StackTraceRecoverySelectTest.kt) Caused by: kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$$inlined$select$lambda$1.invokeSuspend(StackTraceRecoverySelectTest.kt:33) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
\ No newline at end of file + at kotlinx.coroutines.exceptions.StackTraceRecoverySelectTest$doSelect$2$1.invokeSuspend(StackTraceRecoverySelectTest.kt) + at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt) diff --git a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt index 5ba7acf9..2b4e91c0 100644 --- a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -15,7 +15,7 @@ abstract class AbstractLincheckTest : VerifierState() { open fun StressOptions.customize(isStressTest: Boolean): StressOptions = this @Test - fun modelCheckingTest() = ModelCheckingOptions() + open fun modelCheckingTest() = ModelCheckingOptions() .iterations(if (isStressTest) 100 else 20) .invocationsPerIteration(if (isStressTest) 10_000 else 1_000) .commonConfiguration() @@ -38,4 +38,4 @@ abstract class AbstractLincheckTest : VerifierState() { .customize(isStressTest) override fun extractState(): Any = error("Not implemented") -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt index 55c05c55..c7c2c04e 100644 --- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() { private fun keepMe(a: ByteArray) { // does nothing, makes sure the variable is kept in state-machine } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt new file mode 100644 index 00000000..dbe9cb37 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import kotlin.test.* + +class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { + + private var callsToSchedule = 0 + + private inner class STPE : ScheduledThreadPoolExecutor(1) { + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + if (delay != 0L) ++callsToSchedule + return super.schedule(command, delay, unit) + } + } + + private inner class SES : ScheduledExecutorService by STPE() + + @Test + fun testScheduledThreadPool() = runTest { + val executor = STPE() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } + + @Test + fun testScheduledExecutorService() = runTest { + val executor = SES() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt index 15cb83ce..8a7878c9 100644 --- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt @@ -70,8 +70,18 @@ class FailFastOnStartTest : TestBase() { val actor = actor<Int>(Dispatchers.Main, start = CoroutineStart.LAZY) { fail() } actor.send(1) } - + private fun mainException(e: Throwable): Boolean { return e is IllegalStateException && e.message?.contains("Module with the Main dispatcher is missing") ?: false } + + @Test + fun testProduceNonChild() = runTest(expected = ::mainException) { + produce<Int>(Job() + Dispatchers.Main) { fail() } + } + + @Test + fun testAsyncNonChild() = runTest(expected = ::mainException) { + async<Int>(Job() + Dispatchers.Main) { fail() } + } } diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt index c9f722a5..04b0ba54 100644 --- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt @@ -33,7 +33,7 @@ class FailingCoroutinesMachineryTest( private var caught: Throwable? = null private val latch = CountDownLatch(1) - private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t;latch.countDown() } + private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t; latch.countDown() } private val lazyOuterDispatcher = lazy { newFixedThreadPoolContext(1, "") } private object FailingUpdate : ThreadContextElement<Unit> { @@ -115,14 +115,20 @@ class FailingCoroutinesMachineryTest( @Test fun testElement() = runTest { - launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} + // Top-level throwing dispatcher may rethrow an exception right here + runCatching { + launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} + } checkException() } @Test fun testNestedElement() = runTest { - launch(NonCancellable + dispatcher.value + exceptionHandler) { - launch(element) { } + // Top-level throwing dispatcher may rethrow an exception right here + runCatching { + launch(NonCancellable + dispatcher.value + exceptionHandler) { + launch(element) { } + } } checkException() } diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt index e8079ebd..c4232d6e 100644 --- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt +++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -56,7 +56,7 @@ object FieldWalker { * Reflectively starts to walk through object graph and map to all the reached object to their path * in from root. Use [showPath] do display a path if needed. */ - private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> { + private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> { val visited = IdentityHashMap<Any, Ref>() if (root == null) return visited visited[root] = Ref.RootRef diff --git a/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt new file mode 100644 index 00000000..6bbfdd1b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import kotlin.coroutines.* +import kotlin.test.* + +class IntellijIdeaDebuggerEvaluatorCompatibilityTest { + + /* + * This test verifies that our CoroutineScope is accessible to IDEA debugger. + * + * Consider the following scenario: + * ``` + * runBlocking<Unit> { // this: CoroutineScope + * println("runBlocking") + * } + * ``` + * user puts breakpoint to `println` line, opens "Evaluate" window + * and executes `launch { println("launch") }`. They (obviously) expect it to work, but + * it won't: `{}` in `runBlocking` is `SuspendLambda` and `this` is an unused implicit receiver + * that is removed by the compiler (because it's unused). + * + * But we still want to provide consistent user experience for functions with `CoroutineScope` receiver, + * for that IDEA debugger tries to retrieve the scope via `kotlin.coroutines.coroutineContext[Job] as? CoroutineScope` + * and with this test we're fixing this behaviour. + * + * Note that this behaviour is not carved in stone: IDEA fallbacks to `kotlin.coroutines.coroutineContext` for the context if necessary. + */ + + @Test + fun testScopeIsAccessible() = runBlocking<Unit> { + verify() + + withContext(Job()) { + verify() + } + + coroutineScope { + verify() + } + + supervisorScope { + verify() + } + + } + + private suspend fun verify() { + val ctx = coroutineContext + assertTrue { ctx.job is CoroutineScope } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt index 5090e7c0..6d474185 100644 --- a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt new file mode 100644 index 00000000..8a20e084 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import org.junit.Test +import kotlin.test.* + +class ReusableCancellableContinuationLeakStressTest : TestBase() { + + @Suppress("UnnecessaryVariable") + private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T { + val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in + return r + } + + private val iterations = 100_000 * stressTestMultiplier + + class Leak(val i: Int) + + @Test // Simplified version of #2564 + fun testReusableContinuationLeak() = runTest { + val channel = produce(capacity = 1) { // from the main thread + (0 until iterations).forEach { + send(Leak(it)) + } + } + + launch(Dispatchers.Default) { + repeat (iterations) { + val value = channel.receiveBatch() + assertEquals(it, value.i) + } + (channel as Job).join() + + FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt index 56f1e283..06839f4a 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -39,7 +39,7 @@ class ReusableCancellableContinuationTest : TestBase() { repeat(iterations) { suspender { - assertTrue(channel.offer(it)) + assertTrue(channel.trySend(it).isSuccess) } } channel.close() diff --git a/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt new file mode 100644 index 00000000..a256815d --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.flow.* +import org.junit.* + +class ReusableContinuationStressTest : TestBase() { + + private val iterations = 1000 * stressTestMultiplierSqrt + + @Test // Originally reported by @denis-bezrukov in #2736 + fun testDebounceWithStateFlow() = runBlocking<Unit> { + withContext(Dispatchers.Default) { + repeat(iterations) { + launch { // <- load the dispatcher and OS scheduler + runStressTestOnce(1, 1) + } + } + } + } + + private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope { + val stateFlow = MutableStateFlow(0) + val emitter = launch { + repeat(1000) { i -> + stateFlow.emit(i) + delay(delay.toLong()) + } + } + var last = 0 + stateFlow.debounce(debounce.toLong()).take(100).collect { i -> + if (i - last > 100) { + last = i + } + } + emitter.cancel() + } +} diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt index e20362ff..de38df6b 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -171,4 +171,15 @@ class RunBlockingTest : TestBase() { } rb.hashCode() // unused } + + @Test + fun testCancelledParent() { + val job = Job() + job.cancel() + assertFailsWith<CancellationException> { + runBlocking(job) { + expectUnreached() + } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt index e755b17d..49c93c7f 100644 --- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt @@ -41,7 +41,7 @@ class RunInterruptibleTest : TestBase() { val job = launch { runInterruptible(Dispatchers.IO) { expect(2) - latch.offer(Unit) + latch.trySend(Unit) try { Thread.sleep(10_000L) expectUnreached() diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index ca399f53..bd9a185f 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -42,7 +42,7 @@ private const val REAL_PARK_NANOS = 10_000_000L // 10 ms -- park for a little to @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") internal class VirtualTimeSource( private val log: PrintStream? -) : TimeSource { +) : AbstractTimeSource() { private val mainThread: Thread = Thread.currentThread() private var checkpointNanos: Long = System.nanoTime() @@ -142,7 +142,7 @@ internal class VirtualTimeSource( } private fun minParkedTill(): Long = - threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED + threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED @Synchronized fun shutdown() { diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt index ae95e694..d3b2ff12 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -78,4 +78,14 @@ class ActorLazyTest : TestBase() { job.join() finish(5) } -}
\ No newline at end of file + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor<Int>(start = CoroutineStart.LAZY) { + expectUnreached() + } + finish(2) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt index bdca5039..5a2778d5 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -69,11 +69,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithoutCause() = runTest { val actor = actor<Int>(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) assertEquals(42, element) - val next = channel.receiveOrNull() - assertNull(next) + val next = channel.receiveCatching() + assertNull(next.exceptionOrNull()) expect(3) } @@ -88,11 +88,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithCause() = runTest { val actor = actor<Int>(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) - require(element!! == 42) + require(element == 42) try { - channel.receiveOrNull() + channel.receive() } catch (e: IOException) { expect(3) } @@ -111,7 +111,7 @@ class ActorTest(private val capacity: Int) : TestBase() { val job = async { actor<Int>(capacity = capacity) { expect(1) - channel.receiveOrNull() + channel.receive() expectUnreached() } } @@ -173,11 +173,24 @@ class ActorTest(private val capacity: Int) : TestBase() { fun testCloseFreshActor() = runTest { for (start in CoroutineStart.values()) { val job = launch { - val actor = actor<Int>(start = start) { for (i in channel) {} } + val actor = actor<Int>(start = start) { + for (i in channel) { + } + } actor.close() } job.join() } } + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor<Int> { + expectUnreached() + } + finish(2) + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt index 2e73b243..8c9777b4 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -67,10 +67,10 @@ class BroadcastChannelMultiReceiveStressTest( val channel = broadcast.openSubscription() when (receiverIndex % 5) { 0 -> doReceive(channel, receiverIndex) - 1 -> doReceiveOrNull(channel, receiverIndex) + 1 -> doReceiveCatching(channel, receiverIndex) 2 -> doIterator(channel, receiverIndex) 3 -> doReceiveSelect(channel, receiverIndex) - 4 -> doReceiveSelectOrNull(channel, receiverIndex) + 4 -> doReceiveCatchingSelect(channel, receiverIndex) } channel.cancel() } @@ -124,9 +124,9 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) { + private suspend fun doReceiveCatching(channel: ReceiveChannel<Long>, receiverIndex: Int) { while (true) { - val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break) + val stop = doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) if (stop) break } } @@ -148,11 +148,11 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) { while (true) { - val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break + val event = select<Long?> { channel.onReceiveCatching { it.getOrNull() } } ?: break val stop = doReceived(receiverIndex, event) if (stop) break } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt index 76713aa1..86adfee0 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -15,7 +15,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { // total counters private var sendCnt = 0 - private var offerFailedCnt = 0 + private var trySendFailedCnt = 0 private var receivedCnt = 0 private var undeliveredCnt = 0 @@ -23,7 +23,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private var lastReceived = 0 private var dSendCnt = 0 private var dSendExceptionCnt = 0 - private var dOfferFailedCnt = 0 + private var dTrySendFailedCnt = 0 private var dReceivedCnt = 0 private val dUndeliveredCnt = AtomicInteger() @@ -43,30 +43,30 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { joinAll(j1, j2) // All elements must be either received or undelivered (IN every run) - if (dSendCnt - dOfferFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { + if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { println(" Send: $dSendCnt") - println("Send Exception: $dSendExceptionCnt") - println(" Offer failed: $dOfferFailedCnt") + println("Send exception: $dSendExceptionCnt") + println("trySend failed: $dTrySendFailedCnt") println(" Received: $dReceivedCnt") println(" Undelivered: ${dUndeliveredCnt.get()}") error("Failed") } - offerFailedCnt += dOfferFailedCnt + trySendFailedCnt += dTrySendFailedCnt receivedCnt += dReceivedCnt undeliveredCnt += dUndeliveredCnt.get() // clear for next run dSendCnt = 0 dSendExceptionCnt = 0 - dOfferFailedCnt = 0 + dTrySendFailedCnt = 0 dReceivedCnt = 0 dUndeliveredCnt.set(0) } // Stats - println(" Send: $sendCnt") - println(" Offer failed: $offerFailedCnt") - println(" Received: $receivedCnt") - println(" Undelivered: $undeliveredCnt") - assertEquals(sendCnt - offerFailedCnt, receivedCnt + undeliveredCnt) + println(" Send: $sendCnt") + println("trySend failed: $trySendFailedCnt") + println(" Received: $receivedCnt") + println(" Undelivered: $undeliveredCnt") + assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt) } private suspend fun sendOne(channel: Channel<Int>) { @@ -75,11 +75,11 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { try { when (Random.nextInt(2)) { 0 -> channel.send(i) - 1 -> if (!channel.offer(i)) { - dOfferFailedCnt++ + 1 -> if (!channel.trySend(i).isSuccess) { + dTrySendFailedCnt++ } } - } catch(e: Throwable) { + } catch (e: Throwable) { assertTrue(e is CancellationException) // the only exception possible in this test dSendExceptionCnt++ throw e @@ -89,7 +89,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private suspend fun receiveOne(channel: Channel<Int>) { val received = when (Random.nextInt(3)) { 0 -> channel.receive() - 1 -> channel.receiveOrNull() ?: error("Cannot be closed yet") + 1 -> channel.receiveCatching().getOrElse { error("Cannot be closed yet") } 2 -> select { channel.onReceive { it } } @@ -99,4 +99,4 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { dReceivedCnt++ lastReceived = received } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt index f414c333..a6345cc5 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -60,10 +60,10 @@ class ChannelSendReceiveStressTest( launch(pool + CoroutineName("receiver$receiverIndex")) { when (receiverIndex % 5) { 0 -> doReceive(receiverIndex) - 1 -> doReceiveOrNull(receiverIndex) + 1 -> doReceiveCatching(receiverIndex) 2 -> doIterator(receiverIndex) 3 -> doReceiveSelect(receiverIndex) - 4 -> doReceiveSelectOrNull(receiverIndex) + 4 -> doReceiveCatchingSelect(receiverIndex) } receiversCompleted.incrementAndGet() } @@ -152,9 +152,9 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatching(receiverIndex: Int) { while (true) { - doReceived(receiverIndex, channel.receiveOrNull() ?: break) + doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) } } @@ -173,10 +173,10 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(receiverIndex: Int) { while (true) { - val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break + val event = select<Int?> { channel.onReceiveCatching { it.getOrNull() } } ?: break doReceived(receiverIndex, event) } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt index 1188329a..12334326 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -68,7 +68,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T try { block() } finally { - if (!done.offer(true)) + if (!done.trySend(true).isSuccess) error(IllegalStateException("failed to offer to done channel")) } } @@ -188,9 +188,9 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T val receivedData = when (receiveMode) { 1 -> channel.receive() 2 -> select { channel.onReceive { it } } - 3 -> channel.receiveOrNull() ?: error("Should not be closed") - 4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } } - 5 -> channel.receiveOrClosed().value + 3 -> channel.receiveCatching().getOrElse { error("Should not be closed") } + 4 -> select { channel.onReceiveCatching { it.getOrElse { error("Should not be closed") } } } + 5 -> channel.receiveCatching().getOrThrow() 6 -> { val iterator = channel.iterator() check(iterator.hasNext()) { "Should not be closed" } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt index da20f0c5..8512aebc 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt @@ -11,7 +11,7 @@ import kotlin.test.* class ChannelsJvmTest : TestBase() { @Test - fun testBlocking() { + fun testTrySendBlocking() { val ch = Channel<Int>() val sum = GlobalScope.async { var sum = 0 @@ -19,9 +19,36 @@ class ChannelsJvmTest : TestBase() { sum } repeat(10) { - ch.sendBlocking(it) + assertTrue(ch.trySendBlocking(it).isSuccess) } ch.close() assertEquals(45, runBlocking { sum.await() }) } + + @Test + fun testTrySendBlockingClosedChannel() { + run { + val channel = Channel<Unit>().also { it.close() } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is ClosedSendChannelException) } + .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel<Unit>().also { it.close(TestException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestException) } + .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel<Unit>().also { it.cancel(TestCancellationException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestCancellationException) } + .also { assertTrue { it.isClosed } } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt index eb7be575..2b3c05bc 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt @@ -29,7 +29,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { launch(Dispatchers.Default + CoroutineName("Sender$senderId")) { repeat(nEvents) { i -> if (i % nSenders == senderId) { - broadcast.offer(i) + broadcast.trySend(i) sentTotal.incrementAndGet() yield() } @@ -63,7 +63,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { try { withTimeout(timeLimit) { senders.forEach { it.join() } - broadcast.offer(nEvents) // last event to signal receivers termination + broadcast.trySend(nEvents) // last event to signal receivers termination receivers.forEach { it.join() } } } catch (e: CancellationException) { @@ -86,4 +86,4 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { cancel() value } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt index 316b3785..793d7e44 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt @@ -1,15 +1,12 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels import kotlinx.coroutines.* -import org.junit.After -import org.junit.Test -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.* +import org.junit.* +import java.util.concurrent.atomic.* class ConflatedChannelCloseStressTest : TestBase() { @@ -37,12 +34,9 @@ class ConflatedChannelCloseStressTest : TestBase() { var x = senderId try { while (isActive) { - try { - curChannel.get().offer(x) + curChannel.get().trySend(x).onSuccess { x += nSenders sent.incrementAndGet() - } catch (e: ClosedSendChannelException) { - // ignore } } } finally { @@ -64,7 +58,9 @@ class ConflatedChannelCloseStressTest : TestBase() { } val receiver = async(pool + NonCancellable) { while (isActive) { - curChannel.get().receiveOrNull() + curChannel.get().receiveCatching().getOrElse { + it?.let { throw it } + } received.incrementAndGet() } } @@ -110,4 +106,4 @@ class ConflatedChannelCloseStressTest : TestBase() { } class StopException : Exception() -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 51789078..fbc28a18 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -48,7 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() delayChannel.cancel() delay(5100) - assertFailsWith<CancellationException> { delayChannel.poll() } + assertFailsWith<CancellationException> { delayChannel.tryReceive().getOrThrow() } } } @@ -112,13 +112,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() var sum = 0 var n = 0 whileSelect { - this@averageInTimeWindow.onReceiveOrClosed { + this@averageInTimeWindow.onReceiveCatching { if (it.isClosed) { // Send leftovers and bail out if (n != 0) send(sum / n.toDouble()) false } else { - sum += it.value + sum += it.getOrThrow() ++n true } @@ -159,9 +159,9 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() } } -fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll()) +fun ReceiveChannel<Unit>.checkEmpty() = assertNull(tryReceive().getOrNull()) fun ReceiveChannel<Unit>.checkNotEmpty() { - assertNotNull(poll()) - assertNull(poll()) + assertNotNull(tryReceive().getOrNull()) + assertNull(tryReceive().getOrNull()) } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt index f52f8b5b..2d8c0ebc 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -38,13 +38,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { } @Test - fun testReceiveOrNullFromClosedChannel() = runTest { - val channel = Channel<Int>() - channel.close(RecoverableTestException()) - channelReceiveOrNull(channel) - } - - @Test fun testSendToClosedChannel() = runTest { val channel = Channel<Int>() channel.close(RecoverableTestException()) @@ -67,7 +60,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { } private suspend fun channelReceive(channel: Channel<Int>) = channelOp { channel.receive() } - private suspend fun channelReceiveOrNull(channel: Channel<Int>) = channelOp { channel.receiveOrNull() } private suspend inline fun channelOp(block: () -> Unit) { try { @@ -145,25 +137,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { deferred.await() } - // See https://github.com/Kotlin/kotlinx.coroutines/issues/950 - @Test - fun testCancelledOffer() = runTest { - expect(1) - val job = Job() - val actor = actor<Int>(job, Channel.UNLIMITED) { - consumeEach { - expectUnreached() // is cancelled before offer - } - } - job.cancel() - try { - actor.offer(1) - } catch (e: Exception) { - verifyStackTrace("channels/${name.methodName}", e) - finish(2) - } - } - private suspend fun Channel<Int>.sendWithContext(ctx: CoroutineContext) = withContext(ctx) { sendInChannel() yield() // TCE @@ -177,4 +150,4 @@ class StackTraceRecoveryChannelsTest : TestBase() { private suspend fun Channel<Int>.sendFromScope() = coroutineScope { sendWithContext(wrapperDispatcher(coroutineContext)) } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt index 70336659..dba738a8 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.exceptions import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import org.junit.Test import kotlin.test.* @@ -71,4 +72,56 @@ class StackTraceRecoveryCustomExceptionsTest : TestBase() { assertEquals("custom", cause.message) } } + + class WrongMessageException(token: String) : RuntimeException("Token $token") + + @Test + fun testWrongMessageException() = runTest { + val result = runCatching { + coroutineScope<Unit> { + throw WrongMessageException("OK") + } + } + val ex = result.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is WrongMessageException) + assertEquals("Token OK", ex.message) + } + + @Test + fun testWrongMessageExceptionInChannel() = runTest { + val result = produce<Unit>(SupervisorJob() + Dispatchers.Unconfined) { + throw WrongMessageException("OK") + } + val ex = runCatching { + @Suppress("ControlFlowWithEmptyBody") + for (unit in result) { + // Iterator has a special code path + } + }.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is WrongMessageException) + assertEquals("Token OK", ex.message) + } + + class CopyableWithCustomMessage( + message: String?, + cause: Throwable? = null + ) : RuntimeException(message, cause), + CopyableThrowable<CopyableWithCustomMessage> { + + override fun createCopy(): CopyableWithCustomMessage { + return CopyableWithCustomMessage("Recovered: [$message]", cause) + } + } + + @Test + fun testCustomCopyableMessage() = runTest { + val result = runCatching { + coroutineScope<Unit> { + throw CopyableWithCustomMessage("OK") + } + } + val ex = result.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is CopyableWithCustomMessage) + assertEquals("Recovered: [OK]", ex.message) + } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt index bea18a43..a85bb7a2 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt @@ -86,7 +86,6 @@ class StackTraceRecoveryNestedScopesTest : TestBase() { "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callWithTimeout\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:37)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callCoroutineScope\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:43)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:68)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest.verifyAwait(StackTraceRecoveryNestedScopesTest.kt:76)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:71)\n" + "Caused by: kotlinx.coroutines.RecoverableTestException\n" + diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt index 5073b7fd..02607c03 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt @@ -58,7 +58,7 @@ class StackTraceRecoveryNestedTest : TestBase() { try { rootAsync.awaitRootLevel() } catch (e: RecoverableTestException) { - e.verifyException("await\$suspendImpl", "awaitRootLevel") + e.verifyException("awaitRootLevel") finish(8) } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt index 290420e4..0d7648c5 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt @@ -45,9 +45,9 @@ class StackTraceRecoverySelectTest : TestBase() { private suspend fun doSelectAwait(deferred: Deferred<Unit>): Int { return select { deferred.onAwait { - yield() // Hide the stackstrace + yield() // Hide the frame 42 } } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt index dbbd77c4..0a8b6530 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt @@ -34,14 +34,13 @@ class StackTraceRecoveryTest : TestBase() { val deferred = createDeferred(3) val traces = listOf( "java.util.concurrent.ExecutionException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:49)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:44)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:17)\n", "Caused by: java.util.concurrent.ExecutionException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" + "\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n" ) nestedMethod(deferred, *traces.toTypedArray()) @@ -59,7 +58,6 @@ class StackTraceRecoveryTest : TestBase() { "java.util.concurrent.ExecutionException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:44)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:81)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:75)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:71)", @@ -94,7 +92,6 @@ class StackTraceRecoveryTest : TestBase() { "kotlinx.coroutines.RecoverableTestException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testWithContext\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.outerMethod(StackTraceRecoveryTest.kt:150)\n" + @@ -132,7 +129,6 @@ class StackTraceRecoveryTest : TestBase() { "kotlinx.coroutines.RecoverableTestException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCoroutineScope\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2\$1.invokeSuspend(StackTraceRecoveryTest.kt:193)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" + @@ -228,13 +224,13 @@ class StackTraceRecoveryTest : TestBase() { val e = exception assertNotNull(e) verifyStackTrace(e, "kotlinx.coroutines.RecoverableTestException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" + - "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" + - "Caused by: kotlinx.coroutines.RecoverableTestException") + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.access\$throws(StackTraceRecoveryTest.kt:20)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" + + "\t(Coroutine boundary)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" + + "Caused by: kotlinx.coroutines.RecoverableTestException") } private class Callback(val cont: CancellableContinuation<*>) @@ -261,22 +257,8 @@ class StackTraceRecoveryTest : TestBase() { private suspend fun awaitCallback(channel: Channel<Callback>) { suspendCancellableCoroutine<Unit> { cont -> - channel.offer(Callback(cont)) + channel.trySend(Callback(cont)) } yield() // nop to make sure it is not a tail call } - - @Test - fun testWrongMessageException() = runTest { - val result = runCatching { - coroutineScope<Unit> { - throw WrongMessageException("OK") - } - } - val ex = result.exceptionOrNull() ?: error("Expected to fail") - assertTrue(ex is WrongMessageException) - assertEquals("Token OK", ex.message) - } - - public class WrongMessageException(token: String) : RuntimeException("Token $token") } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt index 6034fccb..edce175d 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -15,8 +15,8 @@ class SuppressionTests : TestBase() { @Test fun testNotificationsWithException() = runTest { expect(1) - val coroutineContext = kotlin.coroutines.coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) { + val coroutineContext = kotlin.coroutines.coroutineContext + NonCancellable // workaround for KT-22984 + val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) { override fun onStart() { expect(3) } @@ -82,4 +82,4 @@ class SuppressionTests : TestBase() { assertTrue(e.cause!!.suppressed.isEmpty()) } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt index e3db2626..f1be284c 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt @@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() { fun testThrowingConsumer() = runTest { var i = 0 val api = CallbackApi { - runCatching { it.offer(++i) } + it.trySend(++i) } val flow = callbackFlow<Int> { @@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() { var i = 0 val api = CallbackApi { if (i < 5) { - it.offer(++i) + it.trySend(++i) } else { it.close(RuntimeException()) } } - val flow = callbackFlow<Int>() { + val flow = callbackFlow<Int> { api.start(channel) awaitClose { api.stop() diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt new file mode 100644 index 00000000..98240fc9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.* +import org.junit.* + +/** + * Tests that shared flows keep strong reference to their source flows. + * See https://github.com/Kotlin/kotlinx.coroutines/issues/2557 + */ +@OptIn(DelicateCoroutinesApi::class) +class SharingReferenceTest : TestBase() { + private val token = object {} + + /* + * Single-threaded executor that we are using to ensure that the flow being sharing actually + * suspended (spilled its locals, attached to parent), so we can verify reachability. + * Without that, it's possible to have a situation where target flow is still + * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails. + */ + @get:Rule + val executor = ExecutorRule(1) + + private val weakEmitter = flow { + emit(null) + // suspend forever without keeping a strong reference to continuation -- this is a model of + // a callback API that does not keep a strong reference it is listeners, but works + suspendCancellableCoroutine<Unit> { } + // using the token here to make it easily traceable + emit(token) + } + + @Test + fun testShareInReference() { + val flow = weakEmitter.shareIn(ContextScope(executor), SharingStarted.Eagerly, 0) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInReference() { + val flow = weakEmitter.stateIn(ContextScope(executor), SharingStarted.Eagerly, null) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInSuspendingReference() = runTest { + val flow = weakEmitter.stateIn(GlobalScope) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + private fun linearize() { + runBlocking(executor) { } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt index dc3cd43c..e55eaad1 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt @@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() { for (second in 1..nSeconds) { delay(1000) val cs = collected.map { it.sum() } - println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}") } emitters.cancelAndJoin() collectors.cancelAndJoin() @@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() { @Test fun testTenEmittersAndCollectors() = stress(10, 10) -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt new file mode 100644 index 00000000..660ed0aa --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* +import kotlin.test.* +import kotlin.test.Test + +class StateFlowUpdateStressTest : TestBase() { + private val iterations = 1_000_000 * stressTestMultiplier + + @get:Rule + public val executor = ExecutorRule(2) + + @Test + fun testUpdate() = doTest { update { it + 1 } } + + @Test + fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } } + + @Test + fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } } + + private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest { + val flow = MutableStateFlow(0) + val j1 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + val j2 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + joinAll(j1, j2) + assertEquals(iterations, flow.value) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt index f04b100a..529f8817 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt @@ -7,11 +7,10 @@ package kotlinx.coroutines.guide.exampleBasic01 import kotlinx.coroutines.* -fun main() { - GlobalScope.launch { // launch a new coroutine in background and continue +fun main() = runBlocking { // this: CoroutineScope + launch { // launch a new coroutine and continue delay(1000L) // non-blocking delay for 1 second (default time unit is ms) println("World!") // print after delay } - println("Hello,") // main thread continues while coroutine is delayed - Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive + println("Hello") // main coroutine continues while a previous one is delayed } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt index bfece26b..6bf2af4c 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt @@ -7,13 +7,13 @@ package kotlinx.coroutines.guide.exampleBasic02 import kotlinx.coroutines.* -fun main() { - GlobalScope.launch { // launch a new coroutine in background and continue - delay(1000L) - println("World!") - } - println("Hello,") // main thread continues here immediately - runBlocking { // but this expression blocks the main thread - delay(2000L) // ... while we delay for 2 seconds to keep JVM alive - } +fun main() = runBlocking { // this: CoroutineScope + launch { doWorld() } + println("Hello") +} + +// this is your first suspending function +suspend fun doWorld() { + delay(1000L) + println("World!") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt index 8541f604..67b6894a 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt @@ -7,11 +7,14 @@ package kotlinx.coroutines.guide.exampleBasic03 import kotlinx.coroutines.* -fun main() = runBlocking<Unit> { // start main coroutine - GlobalScope.launch { // launch a new coroutine in background and continue +fun main() = runBlocking { + doWorld() +} + +suspend fun doWorld() = coroutineScope { // this: CoroutineScope + launch { delay(1000L) println("World!") } - println("Hello,") // main coroutine continues here immediately - delay(2000L) // delaying for 2 seconds to keep JVM alive + println("Hello") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt index 69f82771..efac7085 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt @@ -7,11 +7,21 @@ package kotlinx.coroutines.guide.exampleBasic04 import kotlinx.coroutines.* +// Sequentially executes doWorld followed by "Done" fun main() = runBlocking { - val job = GlobalScope.launch { // launch a new coroutine and keep a reference to its Job + doWorld() + println("Done") +} + +// Concurrently executes both sections +suspend fun doWorld() = coroutineScope { // this: CoroutineScope + launch { + delay(2000L) + println("World 2") + } + launch { delay(1000L) - println("World!") + println("World 1") } - println("Hello,") - job.join() // wait until child coroutine completes + println("Hello") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt index 9d530b5f..193f2cc3 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt @@ -7,10 +7,12 @@ package kotlinx.coroutines.guide.exampleBasic05 import kotlinx.coroutines.* -fun main() = runBlocking { // this: CoroutineScope - launch { // launch a new coroutine in the scope of runBlocking +fun main() = runBlocking { + val job = launch { // launch a new coroutine and keep a reference to its Job delay(1000L) println("World!") } - println("Hello,") + println("Hello") + job.join() // wait until child coroutine completes + println("Done") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt index b53d3b89..24b890a0 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt @@ -7,21 +7,11 @@ package kotlinx.coroutines.guide.exampleBasic06 import kotlinx.coroutines.* -fun main() = runBlocking { // this: CoroutineScope - launch { - delay(200L) - println("Task from runBlocking") - } - - coroutineScope { // Creates a coroutine scope +fun main() = runBlocking { + repeat(100_000) { // launch a lot of coroutines launch { - delay(500L) - println("Task from nested launch") + delay(5000L) + print(".") } - - delay(100L) - println("Task from coroutine scope") // This line will be printed before the nested launch } - - println("Coroutine scope is over") // This line is not printed until the nested launch completes } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt deleted file mode 100644 index cd854ce8..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic07 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - launch { doWorld() } - println("Hello,") -} - -// this is your first suspending function -suspend fun doWorld() { - delay(1000L) - println("World!") -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt deleted file mode 100644 index 0a346e0b..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic08 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - repeat(100_000) { // launch a lot of coroutines - launch { - delay(5000L) - print(".") - } - } -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt deleted file mode 100644 index c9783ee5..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic09 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - GlobalScope.launch { - repeat(1000) { i -> - println("I'm sleeping $i ...") - delay(500L) - } - } - delay(1300L) // just quit after delay -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt index 312dc72b..35536a7d 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt @@ -23,10 +23,12 @@ fun main() { println("Completed in $time ms") } +@OptIn(DelicateCoroutinesApi::class) fun somethingUsefulOneAsync() = GlobalScope.async { doSomethingUsefulOne() } +@OptIn(DelicateCoroutinesApi::class) fun somethingUsefulTwoAsync() = GlobalScope.async { doSomethingUsefulTwo() } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt index e23eaf25..c6ad4516 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt @@ -10,9 +10,9 @@ import kotlinx.coroutines.* fun main() = runBlocking<Unit> { // launch a coroutine to process some kind of incoming request val request = launch { - // it spawns two other jobs, one with GlobalScope - GlobalScope.launch { - println("job1: I run in GlobalScope and execute independently!") + // it spawns two other jobs + launch(Job()) { + println("job1: I run in my own Job and execute independently!") delay(1000) println("job1: I am not affected by cancellation of the request") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt index e08ddd08..24cbabe0 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions01 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val job = GlobalScope.launch { // root coroutine with launch println("Throwing exception from launch") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt index 67fdaa71..c3ab68a5 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions02 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt index 9c9b43d2..b966c1ea 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions04 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt index 04f9385f..5f1f3d89 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.exceptions.* import kotlinx.coroutines.* import java.io.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt index 5a5b276b..bc9f77b9 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt @@ -8,6 +8,7 @@ package kotlinx.coroutines.guide.exampleExceptions06 import kotlinx.coroutines.* import java.io.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt index 57fe6382..22380d3a 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt @@ -11,17 +11,21 @@ import kotlinx.coroutines.selects.* suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> { - a.onReceiveOrNull { value -> - if (value == null) - "Channel 'a' is closed" - else + a.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "a -> '$value'" + } else { + "Channel 'a' is closed" + } } - b.onReceiveOrNull { value -> - if (value == null) - "Channel 'b' is closed" - else + b.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "b -> '$value'" + } else { + "Channel 'b' is closed" + } } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt index 464e9b20..68b44564 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt @@ -13,12 +13,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select<Deferred<String>?> { // return next deferred value from this select or null - input.onReceiveOrNull { update -> - update // replaces next value to wait + input.onReceiveCatching { update -> + update.getOrNull() } - current.onAwait { value -> + current.onAwait { value -> send(value) // send value that current deferred has produced - input.receiveOrNull() // and use the next deferred from the input channel + input.receiveCatching().getOrNull() // and use the next deferred from the input channel } } if (next == null) { diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt index 765cd0b9..7e54fb1d 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt @@ -12,7 +12,7 @@ class BasicsGuideTest { @Test fun testExampleBasic01() { test("ExampleBasic01") { kotlinx.coroutines.guide.exampleBasic01.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -20,7 +20,7 @@ class BasicsGuideTest { @Test fun testExampleBasic02() { test("ExampleBasic02") { kotlinx.coroutines.guide.exampleBasic02.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -28,7 +28,7 @@ class BasicsGuideTest { @Test fun testExampleBasic03() { test("ExampleBasic03") { kotlinx.coroutines.guide.exampleBasic03.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -36,50 +36,26 @@ class BasicsGuideTest { @Test fun testExampleBasic04() { test("ExampleBasic04") { kotlinx.coroutines.guide.exampleBasic04.main() }.verifyLines( - "Hello,", - "World!" + "Hello", + "World 1", + "World 2", + "Done" ) } @Test fun testExampleBasic05() { test("ExampleBasic05") { kotlinx.coroutines.guide.exampleBasic05.main() }.verifyLines( - "Hello,", - "World!" + "Hello", + "World!", + "Done" ) } @Test fun testExampleBasic06() { - test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.verifyLines( - "Task from coroutine scope", - "Task from runBlocking", - "Task from nested launch", - "Coroutine scope is over" - ) - } - - @Test - fun testExampleBasic07() { - test("ExampleBasic07") { kotlinx.coroutines.guide.exampleBasic07.main() }.verifyLines( - "Hello,", - "World!" - ) - } - - @Test - fun testExampleBasic08() { - test("ExampleBasic08") { kotlinx.coroutines.guide.exampleBasic08.main() }.also { lines -> + test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.also { lines -> check(lines.size == 1 && lines[0] == ".".repeat(100_000)) } } - - @Test - fun testExampleBasic09() { - test("ExampleBasic09") { kotlinx.coroutines.guide.exampleBasic09.main() }.verifyLines( - "I'm sleeping 0 ...", - "I'm sleeping 1 ...", - "I'm sleeping 2 ..." - ) - } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index d6f1c21d..1a84fb94 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -57,7 +57,7 @@ class DispatcherGuideTest { @Test fun testExampleContext06() { test("ExampleContext06") { kotlinx.coroutines.guide.exampleContext06.main() }.verifyLines( - "job1: I run in GlobalScope and execute independently!", + "job1: I run in my own Job and execute independently!", "job2: I am a child of the request coroutine", "job1: I am not affected by cancellation of the request", "main: Who has survived request cancellation?" diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt new file mode 100644 index 00000000..30fbfee2 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly + +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* + +internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name) + +internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher = + ClosedAfterGuideTestDispatcher(nThreads, name) + +internal class PoolThread( + @JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests + target: Runnable, name: String +) : Thread(target, name) { + init { + isDaemon = true + } +} + +private class ClosedAfterGuideTestDispatcher( + private val nThreads: Int, + private val name: String +) : ExecutorCoroutineDispatcher() { + private val threadNo = AtomicInteger() + + override val executor: Executor = + Executors.newScheduledThreadPool(nThreads, object : ThreadFactory { + override fun newThread(target: java.lang.Runnable): Thread { + return PoolThread( + this@ClosedAfterGuideTestDispatcher, + target, + if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet() + ) + } + }) + + override fun dispatch(context: CoroutineContext, block: Runnable) { + executor.execute(wrapTask(block)) + } + + override fun close() { + (executor as ExecutorService).shutdown() + } + + override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" +} diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index 7eda9043..2e61ec6b 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.knit @@ -11,8 +11,6 @@ import kotlinx.knit.test.* import java.util.concurrent.* import kotlin.test.* -fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block) - // helper function to dump exception to stdout for ease of debugging failed tests private inline fun <T> outputException(name: String, block: () -> T): T = try { block() } @@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) { } throw t } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt index fbd5c0d8..74cc1783 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt @@ -63,11 +63,12 @@ abstract class ChannelLincheckTestBase( } @Operation - fun offer(@Param(name = "value") value: Int): Any = try { - c.offer(value) - } catch (e: NumberedCancellationException) { - e.testResult - } + fun trySend(@Param(name = "value") value: Int): Any = c.trySend(value) + .onSuccess { return true } + .onFailure { + return if (it is NumberedCancellationException) it.testResult + else false + } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -85,11 +86,10 @@ abstract class ChannelLincheckTestBase( } @Operation - fun poll(): Any? = try { - c.poll() - } catch (e: NumberedCancellationException) { - e.testResult - } + fun tryReceive(): Any? = + c.tryReceive() + .onSuccess { return it } + .onFailure { return if (it is NumberedCancellationException) it.testResult else null } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -131,7 +131,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta private val buffer = ArrayList<Int>() private var closedMessage: String? = null - suspend fun send(x: Int): Any = when (val offerRes = offer(x)) { + suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) { true -> Unit false -> suspendCancellableCoroutine { cont -> senders.add(cont to x) @@ -139,7 +139,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta else -> offerRes } - fun offer(element: Int): Any { + fun trySend(element: Int): Any { if (closedMessage !== null) return closedMessage!! if (capacity == CONFLATED) { if (resumeFirstReceiver(element)) return true @@ -163,11 +163,11 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta return false } - suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont -> + suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont -> receivers.add(cont) } - fun poll(): Any? { + fun tryReceive(): Any? { if (buffer.isNotEmpty()) { val el = buffer.removeAt(0) resumeFirstSender().also { @@ -221,4 +221,4 @@ private fun <T> CancellableContinuation<T>.resume(res: T): Boolean { val token = tryResume(res) ?: return false completeResume(token) return true -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt index 6e350660..f7f59eef 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") package kotlinx.coroutines.lincheck @@ -9,10 +9,15 @@ import kotlinx.coroutines.sync.* import org.jetbrains.kotlinx.lincheck.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* +import org.junit.* class MutexLincheckTest : AbstractLincheckTest() { private val mutex = Mutex() + override fun modelCheckingTest() { + // Ignored via empty body as the only way + } + @Operation fun tryLock() = mutex.tryLock() @@ -29,4 +34,4 @@ class MutexLincheckTest : AbstractLincheckTest() { checkObstructionFreedom() override fun extractState() = mutex.isLocked -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt index 84ce773c..2b471d7f 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt @@ -16,7 +16,7 @@ abstract class SemaphoreLincheckTestBase(permits: Int) : AbstractLincheckTest() @Operation fun tryAcquire() = semaphore.tryAcquire() - @Operation(promptCancellation = true) + @Operation(promptCancellation = true, allowExtraSuspension = true) suspend fun acquire() = semaphore.acquire() @Operation(handleExceptionsAsResult = [IllegalStateException::class]) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index f31752c8..fe09440f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -101,7 +101,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { firstBarrier.await() secondBarrier.await() blockingTasks.joinAll() - checkPoolThreadsCreated(21..22) + checkPoolThreadsCreated(21 /* blocking tasks + 1 for CPU */..20 + CORES_COUNT) } @Test @@ -122,7 +122,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { barrier.await() blockingTasks.joinAll() // There may be race when multiple CPU threads are trying to lazily created one more - checkPoolThreadsCreated(104..120) + checkPoolThreadsCreated(101..100 + CORES_COUNT) } @Test diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index bfabf5b2..dd969bdd 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -39,17 +39,9 @@ abstract class SchedulerTestBase : TestBase() { ) } - /** - * Asserts that any number of pool worker threads in [range] exists at the time of method invocation - */ - fun checkPoolThreadsExist(range: IntRange) { - val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count() - assertTrue(threads in range, "Expected threads in $range interval, but has $threads") - } - private fun maxSequenceNumber(): Int? { return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } - .map { sequenceNumber(it.name) }.max() + .map { sequenceNumber(it.name) }.maxOrNull() } private fun sequenceNumber(threadName: String): Int { @@ -105,4 +97,4 @@ abstract class SchedulerTestBase : TestBase() { } } } -}
\ No newline at end of file +} |