diff options
Diffstat (limited to 'kotlinx-coroutines-core/jvm/test')
62 files changed, 690 insertions, 340 deletions
diff --git a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt index 5ba7acf9..2b4e91c0 100644 --- a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -15,7 +15,7 @@ abstract class AbstractLincheckTest : VerifierState() { open fun StressOptions.customize(isStressTest: Boolean): StressOptions = this @Test - fun modelCheckingTest() = ModelCheckingOptions() + open fun modelCheckingTest() = ModelCheckingOptions() .iterations(if (isStressTest) 100 else 20) .invocationsPerIteration(if (isStressTest) 10_000 else 1_000) .commonConfiguration() @@ -38,4 +38,4 @@ abstract class AbstractLincheckTest : VerifierState() { .customize(isStressTest) override fun extractState(): Any = error("Not implemented") -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt index 55c05c55..c7c2c04e 100644 --- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() { private fun keepMe(a: ByteArray) { // does nothing, makes sure the variable is kept in state-machine } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt new file mode 100644 index 00000000..dbe9cb37 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import kotlin.test.* + +class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { + + private var callsToSchedule = 0 + + private inner class STPE : ScheduledThreadPoolExecutor(1) { + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + if (delay != 0L) ++callsToSchedule + return super.schedule(command, delay, unit) + } + } + + private inner class SES : ScheduledExecutorService by STPE() + + @Test + fun testScheduledThreadPool() = runTest { + val executor = STPE() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } + + @Test + fun testScheduledExecutorService() = runTest { + val executor = SES() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt index 15cb83ce..8a7878c9 100644 --- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt @@ -70,8 +70,18 @@ class FailFastOnStartTest : TestBase() { val actor = actor<Int>(Dispatchers.Main, start = CoroutineStart.LAZY) { fail() } actor.send(1) } - + private fun mainException(e: Throwable): Boolean { return e is IllegalStateException && e.message?.contains("Module with the Main dispatcher is missing") ?: false } + + @Test + fun testProduceNonChild() = runTest(expected = ::mainException) { + produce<Int>(Job() + Dispatchers.Main) { fail() } + } + + @Test + fun testAsyncNonChild() = runTest(expected = ::mainException) { + async<Int>(Job() + Dispatchers.Main) { fail() } + } } diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt index c9f722a5..04b0ba54 100644 --- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt @@ -33,7 +33,7 @@ class FailingCoroutinesMachineryTest( private var caught: Throwable? = null private val latch = CountDownLatch(1) - private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t;latch.countDown() } + private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t; latch.countDown() } private val lazyOuterDispatcher = lazy { newFixedThreadPoolContext(1, "") } private object FailingUpdate : ThreadContextElement<Unit> { @@ -115,14 +115,20 @@ class FailingCoroutinesMachineryTest( @Test fun testElement() = runTest { - launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} + // Top-level throwing dispatcher may rethrow an exception right here + runCatching { + launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} + } checkException() } @Test fun testNestedElement() = runTest { - launch(NonCancellable + dispatcher.value + exceptionHandler) { - launch(element) { } + // Top-level throwing dispatcher may rethrow an exception right here + runCatching { + launch(NonCancellable + dispatcher.value + exceptionHandler) { + launch(element) { } + } } checkException() } diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt index e8079ebd..c4232d6e 100644 --- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt +++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -56,7 +56,7 @@ object FieldWalker { * Reflectively starts to walk through object graph and map to all the reached object to their path * in from root. Use [showPath] do display a path if needed. */ - private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> { + private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> { val visited = IdentityHashMap<Any, Ref>() if (root == null) return visited visited[root] = Ref.RootRef diff --git a/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt new file mode 100644 index 00000000..6bbfdd1b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import kotlin.coroutines.* +import kotlin.test.* + +class IntellijIdeaDebuggerEvaluatorCompatibilityTest { + + /* + * This test verifies that our CoroutineScope is accessible to IDEA debugger. + * + * Consider the following scenario: + * ``` + * runBlocking<Unit> { // this: CoroutineScope + * println("runBlocking") + * } + * ``` + * user puts breakpoint to `println` line, opens "Evaluate" window + * and executes `launch { println("launch") }`. They (obviously) expect it to work, but + * it won't: `{}` in `runBlocking` is `SuspendLambda` and `this` is an unused implicit receiver + * that is removed by the compiler (because it's unused). + * + * But we still want to provide consistent user experience for functions with `CoroutineScope` receiver, + * for that IDEA debugger tries to retrieve the scope via `kotlin.coroutines.coroutineContext[Job] as? CoroutineScope` + * and with this test we're fixing this behaviour. + * + * Note that this behaviour is not carved in stone: IDEA fallbacks to `kotlin.coroutines.coroutineContext` for the context if necessary. + */ + + @Test + fun testScopeIsAccessible() = runBlocking<Unit> { + verify() + + withContext(Job()) { + verify() + } + + coroutineScope { + verify() + } + + supervisorScope { + verify() + } + + } + + private suspend fun verify() { + val ctx = coroutineContext + assertTrue { ctx.job is CoroutineScope } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt index 5090e7c0..6d474185 100644 --- a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt new file mode 100644 index 00000000..8a20e084 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import org.junit.Test +import kotlin.test.* + +class ReusableCancellableContinuationLeakStressTest : TestBase() { + + @Suppress("UnnecessaryVariable") + private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T { + val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in + return r + } + + private val iterations = 100_000 * stressTestMultiplier + + class Leak(val i: Int) + + @Test // Simplified version of #2564 + fun testReusableContinuationLeak() = runTest { + val channel = produce(capacity = 1) { // from the main thread + (0 until iterations).forEach { + send(Leak(it)) + } + } + + launch(Dispatchers.Default) { + repeat (iterations) { + val value = channel.receiveBatch() + assertEquals(it, value.i) + } + (channel as Job).join() + + FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt index 56f1e283..06839f4a 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -39,7 +39,7 @@ class ReusableCancellableContinuationTest : TestBase() { repeat(iterations) { suspender { - assertTrue(channel.offer(it)) + assertTrue(channel.trySend(it).isSuccess) } } channel.close() diff --git a/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt new file mode 100644 index 00000000..a256815d --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.flow.* +import org.junit.* + +class ReusableContinuationStressTest : TestBase() { + + private val iterations = 1000 * stressTestMultiplierSqrt + + @Test // Originally reported by @denis-bezrukov in #2736 + fun testDebounceWithStateFlow() = runBlocking<Unit> { + withContext(Dispatchers.Default) { + repeat(iterations) { + launch { // <- load the dispatcher and OS scheduler + runStressTestOnce(1, 1) + } + } + } + } + + private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope { + val stateFlow = MutableStateFlow(0) + val emitter = launch { + repeat(1000) { i -> + stateFlow.emit(i) + delay(delay.toLong()) + } + } + var last = 0 + stateFlow.debounce(debounce.toLong()).take(100).collect { i -> + if (i - last > 100) { + last = i + } + } + emitter.cancel() + } +} diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt index e20362ff..de38df6b 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -171,4 +171,15 @@ class RunBlockingTest : TestBase() { } rb.hashCode() // unused } + + @Test + fun testCancelledParent() { + val job = Job() + job.cancel() + assertFailsWith<CancellationException> { + runBlocking(job) { + expectUnreached() + } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt index e755b17d..49c93c7f 100644 --- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt @@ -41,7 +41,7 @@ class RunInterruptibleTest : TestBase() { val job = launch { runInterruptible(Dispatchers.IO) { expect(2) - latch.offer(Unit) + latch.trySend(Unit) try { Thread.sleep(10_000L) expectUnreached() diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index ca399f53..bd9a185f 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -42,7 +42,7 @@ private const val REAL_PARK_NANOS = 10_000_000L // 10 ms -- park for a little to @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") internal class VirtualTimeSource( private val log: PrintStream? -) : TimeSource { +) : AbstractTimeSource() { private val mainThread: Thread = Thread.currentThread() private var checkpointNanos: Long = System.nanoTime() @@ -142,7 +142,7 @@ internal class VirtualTimeSource( } private fun minParkedTill(): Long = - threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED + threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED @Synchronized fun shutdown() { diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt index ae95e694..d3b2ff12 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -78,4 +78,14 @@ class ActorLazyTest : TestBase() { job.join() finish(5) } -}
\ No newline at end of file + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor<Int>(start = CoroutineStart.LAZY) { + expectUnreached() + } + finish(2) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt index bdca5039..5a2778d5 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -69,11 +69,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithoutCause() = runTest { val actor = actor<Int>(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) assertEquals(42, element) - val next = channel.receiveOrNull() - assertNull(next) + val next = channel.receiveCatching() + assertNull(next.exceptionOrNull()) expect(3) } @@ -88,11 +88,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithCause() = runTest { val actor = actor<Int>(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) - require(element!! == 42) + require(element == 42) try { - channel.receiveOrNull() + channel.receive() } catch (e: IOException) { expect(3) } @@ -111,7 +111,7 @@ class ActorTest(private val capacity: Int) : TestBase() { val job = async { actor<Int>(capacity = capacity) { expect(1) - channel.receiveOrNull() + channel.receive() expectUnreached() } } @@ -173,11 +173,24 @@ class ActorTest(private val capacity: Int) : TestBase() { fun testCloseFreshActor() = runTest { for (start in CoroutineStart.values()) { val job = launch { - val actor = actor<Int>(start = start) { for (i in channel) {} } + val actor = actor<Int>(start = start) { + for (i in channel) { + } + } actor.close() } job.join() } } + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor<Int> { + expectUnreached() + } + finish(2) + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt index 2e73b243..8c9777b4 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -67,10 +67,10 @@ class BroadcastChannelMultiReceiveStressTest( val channel = broadcast.openSubscription() when (receiverIndex % 5) { 0 -> doReceive(channel, receiverIndex) - 1 -> doReceiveOrNull(channel, receiverIndex) + 1 -> doReceiveCatching(channel, receiverIndex) 2 -> doIterator(channel, receiverIndex) 3 -> doReceiveSelect(channel, receiverIndex) - 4 -> doReceiveSelectOrNull(channel, receiverIndex) + 4 -> doReceiveCatchingSelect(channel, receiverIndex) } channel.cancel() } @@ -124,9 +124,9 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) { + private suspend fun doReceiveCatching(channel: ReceiveChannel<Long>, receiverIndex: Int) { while (true) { - val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break) + val stop = doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) if (stop) break } } @@ -148,11 +148,11 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) { while (true) { - val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break + val event = select<Long?> { channel.onReceiveCatching { it.getOrNull() } } ?: break val stop = doReceived(receiverIndex, event) if (stop) break } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt index 76713aa1..86adfee0 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -15,7 +15,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { // total counters private var sendCnt = 0 - private var offerFailedCnt = 0 + private var trySendFailedCnt = 0 private var receivedCnt = 0 private var undeliveredCnt = 0 @@ -23,7 +23,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private var lastReceived = 0 private var dSendCnt = 0 private var dSendExceptionCnt = 0 - private var dOfferFailedCnt = 0 + private var dTrySendFailedCnt = 0 private var dReceivedCnt = 0 private val dUndeliveredCnt = AtomicInteger() @@ -43,30 +43,30 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { joinAll(j1, j2) // All elements must be either received or undelivered (IN every run) - if (dSendCnt - dOfferFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { + if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) { println(" Send: $dSendCnt") - println("Send Exception: $dSendExceptionCnt") - println(" Offer failed: $dOfferFailedCnt") + println("Send exception: $dSendExceptionCnt") + println("trySend failed: $dTrySendFailedCnt") println(" Received: $dReceivedCnt") println(" Undelivered: ${dUndeliveredCnt.get()}") error("Failed") } - offerFailedCnt += dOfferFailedCnt + trySendFailedCnt += dTrySendFailedCnt receivedCnt += dReceivedCnt undeliveredCnt += dUndeliveredCnt.get() // clear for next run dSendCnt = 0 dSendExceptionCnt = 0 - dOfferFailedCnt = 0 + dTrySendFailedCnt = 0 dReceivedCnt = 0 dUndeliveredCnt.set(0) } // Stats - println(" Send: $sendCnt") - println(" Offer failed: $offerFailedCnt") - println(" Received: $receivedCnt") - println(" Undelivered: $undeliveredCnt") - assertEquals(sendCnt - offerFailedCnt, receivedCnt + undeliveredCnt) + println(" Send: $sendCnt") + println("trySend failed: $trySendFailedCnt") + println(" Received: $receivedCnt") + println(" Undelivered: $undeliveredCnt") + assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt) } private suspend fun sendOne(channel: Channel<Int>) { @@ -75,11 +75,11 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { try { when (Random.nextInt(2)) { 0 -> channel.send(i) - 1 -> if (!channel.offer(i)) { - dOfferFailedCnt++ + 1 -> if (!channel.trySend(i).isSuccess) { + dTrySendFailedCnt++ } } - } catch(e: Throwable) { + } catch (e: Throwable) { assertTrue(e is CancellationException) // the only exception possible in this test dSendExceptionCnt++ throw e @@ -89,7 +89,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private suspend fun receiveOne(channel: Channel<Int>) { val received = when (Random.nextInt(3)) { 0 -> channel.receive() - 1 -> channel.receiveOrNull() ?: error("Cannot be closed yet") + 1 -> channel.receiveCatching().getOrElse { error("Cannot be closed yet") } 2 -> select { channel.onReceive { it } } @@ -99,4 +99,4 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { dReceivedCnt++ lastReceived = received } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt index f414c333..a6345cc5 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -60,10 +60,10 @@ class ChannelSendReceiveStressTest( launch(pool + CoroutineName("receiver$receiverIndex")) { when (receiverIndex % 5) { 0 -> doReceive(receiverIndex) - 1 -> doReceiveOrNull(receiverIndex) + 1 -> doReceiveCatching(receiverIndex) 2 -> doIterator(receiverIndex) 3 -> doReceiveSelect(receiverIndex) - 4 -> doReceiveSelectOrNull(receiverIndex) + 4 -> doReceiveCatchingSelect(receiverIndex) } receiversCompleted.incrementAndGet() } @@ -152,9 +152,9 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatching(receiverIndex: Int) { while (true) { - doReceived(receiverIndex, channel.receiveOrNull() ?: break) + doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) } } @@ -173,10 +173,10 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(receiverIndex: Int) { while (true) { - val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break + val event = select<Int?> { channel.onReceiveCatching { it.getOrNull() } } ?: break doReceived(receiverIndex, event) } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt index 1188329a..12334326 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -68,7 +68,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T try { block() } finally { - if (!done.offer(true)) + if (!done.trySend(true).isSuccess) error(IllegalStateException("failed to offer to done channel")) } } @@ -188,9 +188,9 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T val receivedData = when (receiveMode) { 1 -> channel.receive() 2 -> select { channel.onReceive { it } } - 3 -> channel.receiveOrNull() ?: error("Should not be closed") - 4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } } - 5 -> channel.receiveOrClosed().value + 3 -> channel.receiveCatching().getOrElse { error("Should not be closed") } + 4 -> select { channel.onReceiveCatching { it.getOrElse { error("Should not be closed") } } } + 5 -> channel.receiveCatching().getOrThrow() 6 -> { val iterator = channel.iterator() check(iterator.hasNext()) { "Should not be closed" } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt index da20f0c5..8512aebc 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt @@ -11,7 +11,7 @@ import kotlin.test.* class ChannelsJvmTest : TestBase() { @Test - fun testBlocking() { + fun testTrySendBlocking() { val ch = Channel<Int>() val sum = GlobalScope.async { var sum = 0 @@ -19,9 +19,36 @@ class ChannelsJvmTest : TestBase() { sum } repeat(10) { - ch.sendBlocking(it) + assertTrue(ch.trySendBlocking(it).isSuccess) } ch.close() assertEquals(45, runBlocking { sum.await() }) } + + @Test + fun testTrySendBlockingClosedChannel() { + run { + val channel = Channel<Unit>().also { it.close() } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is ClosedSendChannelException) } + .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel<Unit>().also { it.close(TestException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestException) } + .also { assertTrue { it.isClosed } } + } + + run { + val channel = Channel<Unit>().also { it.cancel(TestCancellationException()) } + channel.trySendBlocking(Unit) + .onSuccess { expectUnreached() } + .onFailure { assertTrue(it is TestCancellationException) } + .also { assertTrue { it.isClosed } } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt index eb7be575..2b3c05bc 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt @@ -29,7 +29,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { launch(Dispatchers.Default + CoroutineName("Sender$senderId")) { repeat(nEvents) { i -> if (i % nSenders == senderId) { - broadcast.offer(i) + broadcast.trySend(i) sentTotal.incrementAndGet() yield() } @@ -63,7 +63,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { try { withTimeout(timeLimit) { senders.forEach { it.join() } - broadcast.offer(nEvents) // last event to signal receivers termination + broadcast.trySend(nEvents) // last event to signal receivers termination receivers.forEach { it.join() } } } catch (e: CancellationException) { @@ -86,4 +86,4 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() { cancel() value } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt index 316b3785..793d7e44 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt @@ -1,15 +1,12 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels import kotlinx.coroutines.* -import org.junit.After -import org.junit.Test -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.* +import org.junit.* +import java.util.concurrent.atomic.* class ConflatedChannelCloseStressTest : TestBase() { @@ -37,12 +34,9 @@ class ConflatedChannelCloseStressTest : TestBase() { var x = senderId try { while (isActive) { - try { - curChannel.get().offer(x) + curChannel.get().trySend(x).onSuccess { x += nSenders sent.incrementAndGet() - } catch (e: ClosedSendChannelException) { - // ignore } } } finally { @@ -64,7 +58,9 @@ class ConflatedChannelCloseStressTest : TestBase() { } val receiver = async(pool + NonCancellable) { while (isActive) { - curChannel.get().receiveOrNull() + curChannel.get().receiveCatching().getOrElse { + it?.let { throw it } + } received.incrementAndGet() } } @@ -110,4 +106,4 @@ class ConflatedChannelCloseStressTest : TestBase() { } class StopException : Exception() -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 51789078..fbc28a18 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -48,7 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() delayChannel.cancel() delay(5100) - assertFailsWith<CancellationException> { delayChannel.poll() } + assertFailsWith<CancellationException> { delayChannel.tryReceive().getOrThrow() } } } @@ -112,13 +112,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() var sum = 0 var n = 0 whileSelect { - this@averageInTimeWindow.onReceiveOrClosed { + this@averageInTimeWindow.onReceiveCatching { if (it.isClosed) { // Send leftovers and bail out if (n != 0) send(sum / n.toDouble()) false } else { - sum += it.value + sum += it.getOrThrow() ++n true } @@ -159,9 +159,9 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() } } -fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll()) +fun ReceiveChannel<Unit>.checkEmpty() = assertNull(tryReceive().getOrNull()) fun ReceiveChannel<Unit>.checkNotEmpty() { - assertNotNull(poll()) - assertNull(poll()) + assertNotNull(tryReceive().getOrNull()) + assertNull(tryReceive().getOrNull()) } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt index f52f8b5b..2d8c0ebc 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -38,13 +38,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { } @Test - fun testReceiveOrNullFromClosedChannel() = runTest { - val channel = Channel<Int>() - channel.close(RecoverableTestException()) - channelReceiveOrNull(channel) - } - - @Test fun testSendToClosedChannel() = runTest { val channel = Channel<Int>() channel.close(RecoverableTestException()) @@ -67,7 +60,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { } private suspend fun channelReceive(channel: Channel<Int>) = channelOp { channel.receive() } - private suspend fun channelReceiveOrNull(channel: Channel<Int>) = channelOp { channel.receiveOrNull() } private suspend inline fun channelOp(block: () -> Unit) { try { @@ -145,25 +137,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { deferred.await() } - // See https://github.com/Kotlin/kotlinx.coroutines/issues/950 - @Test - fun testCancelledOffer() = runTest { - expect(1) - val job = Job() - val actor = actor<Int>(job, Channel.UNLIMITED) { - consumeEach { - expectUnreached() // is cancelled before offer - } - } - job.cancel() - try { - actor.offer(1) - } catch (e: Exception) { - verifyStackTrace("channels/${name.methodName}", e) - finish(2) - } - } - private suspend fun Channel<Int>.sendWithContext(ctx: CoroutineContext) = withContext(ctx) { sendInChannel() yield() // TCE @@ -177,4 +150,4 @@ class StackTraceRecoveryChannelsTest : TestBase() { private suspend fun Channel<Int>.sendFromScope() = coroutineScope { sendWithContext(wrapperDispatcher(coroutineContext)) } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt index 70336659..dba738a8 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.exceptions import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import org.junit.Test import kotlin.test.* @@ -71,4 +72,56 @@ class StackTraceRecoveryCustomExceptionsTest : TestBase() { assertEquals("custom", cause.message) } } + + class WrongMessageException(token: String) : RuntimeException("Token $token") + + @Test + fun testWrongMessageException() = runTest { + val result = runCatching { + coroutineScope<Unit> { + throw WrongMessageException("OK") + } + } + val ex = result.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is WrongMessageException) + assertEquals("Token OK", ex.message) + } + + @Test + fun testWrongMessageExceptionInChannel() = runTest { + val result = produce<Unit>(SupervisorJob() + Dispatchers.Unconfined) { + throw WrongMessageException("OK") + } + val ex = runCatching { + @Suppress("ControlFlowWithEmptyBody") + for (unit in result) { + // Iterator has a special code path + } + }.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is WrongMessageException) + assertEquals("Token OK", ex.message) + } + + class CopyableWithCustomMessage( + message: String?, + cause: Throwable? = null + ) : RuntimeException(message, cause), + CopyableThrowable<CopyableWithCustomMessage> { + + override fun createCopy(): CopyableWithCustomMessage { + return CopyableWithCustomMessage("Recovered: [$message]", cause) + } + } + + @Test + fun testCustomCopyableMessage() = runTest { + val result = runCatching { + coroutineScope<Unit> { + throw CopyableWithCustomMessage("OK") + } + } + val ex = result.exceptionOrNull() ?: error("Expected to fail") + assertTrue(ex is CopyableWithCustomMessage) + assertEquals("Recovered: [OK]", ex.message) + } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt index bea18a43..a85bb7a2 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt @@ -86,7 +86,6 @@ class StackTraceRecoveryNestedScopesTest : TestBase() { "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callWithTimeout\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:37)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callCoroutineScope\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:43)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:68)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest.verifyAwait(StackTraceRecoveryNestedScopesTest.kt:76)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:71)\n" + "Caused by: kotlinx.coroutines.RecoverableTestException\n" + diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt index 5073b7fd..02607c03 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt @@ -58,7 +58,7 @@ class StackTraceRecoveryNestedTest : TestBase() { try { rootAsync.awaitRootLevel() } catch (e: RecoverableTestException) { - e.verifyException("await\$suspendImpl", "awaitRootLevel") + e.verifyException("awaitRootLevel") finish(8) } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt index 290420e4..0d7648c5 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt @@ -45,9 +45,9 @@ class StackTraceRecoverySelectTest : TestBase() { private suspend fun doSelectAwait(deferred: Deferred<Unit>): Int { return select { deferred.onAwait { - yield() // Hide the stackstrace + yield() // Hide the frame 42 } } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt index dbbd77c4..0a8b6530 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt @@ -34,14 +34,13 @@ class StackTraceRecoveryTest : TestBase() { val deferred = createDeferred(3) val traces = listOf( "java.util.concurrent.ExecutionException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:49)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:44)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:17)\n", "Caused by: java.util.concurrent.ExecutionException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" + "\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n" ) nestedMethod(deferred, *traces.toTypedArray()) @@ -59,7 +58,6 @@ class StackTraceRecoveryTest : TestBase() { "java.util.concurrent.ExecutionException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:44)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:81)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:75)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:71)", @@ -94,7 +92,6 @@ class StackTraceRecoveryTest : TestBase() { "kotlinx.coroutines.RecoverableTestException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testWithContext\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.outerMethod(StackTraceRecoveryTest.kt:150)\n" + @@ -132,7 +129,6 @@ class StackTraceRecoveryTest : TestBase() { "kotlinx.coroutines.RecoverableTestException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCoroutineScope\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" + "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2\$1.invokeSuspend(StackTraceRecoveryTest.kt:193)\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" + @@ -228,13 +224,13 @@ class StackTraceRecoveryTest : TestBase() { val e = exception assertNotNull(e) verifyStackTrace(e, "kotlinx.coroutines.RecoverableTestException\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" + - "\t(Coroutine boundary)\n" + - "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" + - "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" + - "Caused by: kotlinx.coroutines.RecoverableTestException") + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.access\$throws(StackTraceRecoveryTest.kt:20)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" + + "\t(Coroutine boundary)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" + + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" + + "Caused by: kotlinx.coroutines.RecoverableTestException") } private class Callback(val cont: CancellableContinuation<*>) @@ -261,22 +257,8 @@ class StackTraceRecoveryTest : TestBase() { private suspend fun awaitCallback(channel: Channel<Callback>) { suspendCancellableCoroutine<Unit> { cont -> - channel.offer(Callback(cont)) + channel.trySend(Callback(cont)) } yield() // nop to make sure it is not a tail call } - - @Test - fun testWrongMessageException() = runTest { - val result = runCatching { - coroutineScope<Unit> { - throw WrongMessageException("OK") - } - } - val ex = result.exceptionOrNull() ?: error("Expected to fail") - assertTrue(ex is WrongMessageException) - assertEquals("Token OK", ex.message) - } - - public class WrongMessageException(token: String) : RuntimeException("Token $token") } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt index 6034fccb..edce175d 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -15,8 +15,8 @@ class SuppressionTests : TestBase() { @Test fun testNotificationsWithException() = runTest { expect(1) - val coroutineContext = kotlin.coroutines.coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) { + val coroutineContext = kotlin.coroutines.coroutineContext + NonCancellable // workaround for KT-22984 + val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) { override fun onStart() { expect(3) } @@ -82,4 +82,4 @@ class SuppressionTests : TestBase() { assertTrue(e.cause!!.suppressed.isEmpty()) } } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt index e3db2626..f1be284c 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt @@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() { fun testThrowingConsumer() = runTest { var i = 0 val api = CallbackApi { - runCatching { it.offer(++i) } + it.trySend(++i) } val flow = callbackFlow<Int> { @@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() { var i = 0 val api = CallbackApi { if (i < 5) { - it.offer(++i) + it.trySend(++i) } else { it.close(RuntimeException()) } } - val flow = callbackFlow<Int>() { + val flow = callbackFlow<Int> { api.start(channel) awaitClose { api.stop() diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt new file mode 100644 index 00000000..98240fc9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.* +import org.junit.* + +/** + * Tests that shared flows keep strong reference to their source flows. + * See https://github.com/Kotlin/kotlinx.coroutines/issues/2557 + */ +@OptIn(DelicateCoroutinesApi::class) +class SharingReferenceTest : TestBase() { + private val token = object {} + + /* + * Single-threaded executor that we are using to ensure that the flow being sharing actually + * suspended (spilled its locals, attached to parent), so we can verify reachability. + * Without that, it's possible to have a situation where target flow is still + * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails. + */ + @get:Rule + val executor = ExecutorRule(1) + + private val weakEmitter = flow { + emit(null) + // suspend forever without keeping a strong reference to continuation -- this is a model of + // a callback API that does not keep a strong reference it is listeners, but works + suspendCancellableCoroutine<Unit> { } + // using the token here to make it easily traceable + emit(token) + } + + @Test + fun testShareInReference() { + val flow = weakEmitter.shareIn(ContextScope(executor), SharingStarted.Eagerly, 0) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInReference() { + val flow = weakEmitter.stateIn(ContextScope(executor), SharingStarted.Eagerly, null) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + @Test + fun testStateInSuspendingReference() = runTest { + val flow = weakEmitter.stateIn(GlobalScope) + linearize() + FieldWalker.assertReachableCount(1, flow) { it === token } + } + + private fun linearize() { + runBlocking(executor) { } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt index dc3cd43c..e55eaad1 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt @@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() { for (second in 1..nSeconds) { delay(1000) val cs = collected.map { it.sum() } - println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}") } emitters.cancelAndJoin() collectors.cancelAndJoin() @@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() { @Test fun testTenEmittersAndCollectors() = stress(10, 10) -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt new file mode 100644 index 00000000..660ed0aa --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* +import kotlin.test.* +import kotlin.test.Test + +class StateFlowUpdateStressTest : TestBase() { + private val iterations = 1_000_000 * stressTestMultiplier + + @get:Rule + public val executor = ExecutorRule(2) + + @Test + fun testUpdate() = doTest { update { it + 1 } } + + @Test + fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } } + + @Test + fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } } + + private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest { + val flow = MutableStateFlow(0) + val j1 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + val j2 = launch(Dispatchers.Default) { + repeat(iterations / 2) { + flow.increment() + } + } + + joinAll(j1, j2) + assertEquals(iterations, flow.value) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt index f04b100a..529f8817 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt @@ -7,11 +7,10 @@ package kotlinx.coroutines.guide.exampleBasic01 import kotlinx.coroutines.* -fun main() { - GlobalScope.launch { // launch a new coroutine in background and continue +fun main() = runBlocking { // this: CoroutineScope + launch { // launch a new coroutine and continue delay(1000L) // non-blocking delay for 1 second (default time unit is ms) println("World!") // print after delay } - println("Hello,") // main thread continues while coroutine is delayed - Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive + println("Hello") // main coroutine continues while a previous one is delayed } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt index bfece26b..6bf2af4c 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt @@ -7,13 +7,13 @@ package kotlinx.coroutines.guide.exampleBasic02 import kotlinx.coroutines.* -fun main() { - GlobalScope.launch { // launch a new coroutine in background and continue - delay(1000L) - println("World!") - } - println("Hello,") // main thread continues here immediately - runBlocking { // but this expression blocks the main thread - delay(2000L) // ... while we delay for 2 seconds to keep JVM alive - } +fun main() = runBlocking { // this: CoroutineScope + launch { doWorld() } + println("Hello") +} + +// this is your first suspending function +suspend fun doWorld() { + delay(1000L) + println("World!") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt index 8541f604..67b6894a 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt @@ -7,11 +7,14 @@ package kotlinx.coroutines.guide.exampleBasic03 import kotlinx.coroutines.* -fun main() = runBlocking<Unit> { // start main coroutine - GlobalScope.launch { // launch a new coroutine in background and continue +fun main() = runBlocking { + doWorld() +} + +suspend fun doWorld() = coroutineScope { // this: CoroutineScope + launch { delay(1000L) println("World!") } - println("Hello,") // main coroutine continues here immediately - delay(2000L) // delaying for 2 seconds to keep JVM alive + println("Hello") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt index 69f82771..efac7085 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt @@ -7,11 +7,21 @@ package kotlinx.coroutines.guide.exampleBasic04 import kotlinx.coroutines.* +// Sequentially executes doWorld followed by "Done" fun main() = runBlocking { - val job = GlobalScope.launch { // launch a new coroutine and keep a reference to its Job + doWorld() + println("Done") +} + +// Concurrently executes both sections +suspend fun doWorld() = coroutineScope { // this: CoroutineScope + launch { + delay(2000L) + println("World 2") + } + launch { delay(1000L) - println("World!") + println("World 1") } - println("Hello,") - job.join() // wait until child coroutine completes + println("Hello") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt index 9d530b5f..193f2cc3 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt @@ -7,10 +7,12 @@ package kotlinx.coroutines.guide.exampleBasic05 import kotlinx.coroutines.* -fun main() = runBlocking { // this: CoroutineScope - launch { // launch a new coroutine in the scope of runBlocking +fun main() = runBlocking { + val job = launch { // launch a new coroutine and keep a reference to its Job delay(1000L) println("World!") } - println("Hello,") + println("Hello") + job.join() // wait until child coroutine completes + println("Done") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt index b53d3b89..24b890a0 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt @@ -7,21 +7,11 @@ package kotlinx.coroutines.guide.exampleBasic06 import kotlinx.coroutines.* -fun main() = runBlocking { // this: CoroutineScope - launch { - delay(200L) - println("Task from runBlocking") - } - - coroutineScope { // Creates a coroutine scope +fun main() = runBlocking { + repeat(100_000) { // launch a lot of coroutines launch { - delay(500L) - println("Task from nested launch") + delay(5000L) + print(".") } - - delay(100L) - println("Task from coroutine scope") // This line will be printed before the nested launch } - - println("Coroutine scope is over") // This line is not printed until the nested launch completes } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt deleted file mode 100644 index cd854ce8..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic07 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - launch { doWorld() } - println("Hello,") -} - -// this is your first suspending function -suspend fun doWorld() { - delay(1000L) - println("World!") -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt deleted file mode 100644 index 0a346e0b..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic08 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - repeat(100_000) { // launch a lot of coroutines - launch { - delay(5000L) - print(".") - } - } -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt deleted file mode 100644 index c9783ee5..00000000 --- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit. -package kotlinx.coroutines.guide.exampleBasic09 - -import kotlinx.coroutines.* - -fun main() = runBlocking { - GlobalScope.launch { - repeat(1000) { i -> - println("I'm sleeping $i ...") - delay(500L) - } - } - delay(1300L) // just quit after delay -} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt index 312dc72b..35536a7d 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt @@ -23,10 +23,12 @@ fun main() { println("Completed in $time ms") } +@OptIn(DelicateCoroutinesApi::class) fun somethingUsefulOneAsync() = GlobalScope.async { doSomethingUsefulOne() } +@OptIn(DelicateCoroutinesApi::class) fun somethingUsefulTwoAsync() = GlobalScope.async { doSomethingUsefulTwo() } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt index e23eaf25..c6ad4516 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt @@ -10,9 +10,9 @@ import kotlinx.coroutines.* fun main() = runBlocking<Unit> { // launch a coroutine to process some kind of incoming request val request = launch { - // it spawns two other jobs, one with GlobalScope - GlobalScope.launch { - println("job1: I run in GlobalScope and execute independently!") + // it spawns two other jobs + launch(Job()) { + println("job1: I run in my own Job and execute independently!") delay(1000) println("job1: I am not affected by cancellation of the request") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt index e08ddd08..24cbabe0 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions01 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val job = GlobalScope.launch { // root coroutine with launch println("Throwing exception from launch") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt index 67fdaa71..c3ab68a5 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions02 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt index 9c9b43d2..b966c1ea 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions04 import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt index 04f9385f..5f1f3d89 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.exceptions.* import kotlinx.coroutines.* import java.io.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt index 5a5b276b..bc9f77b9 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt @@ -8,6 +8,7 @@ package kotlinx.coroutines.guide.exampleExceptions06 import kotlinx.coroutines.* import java.io.* +@OptIn(DelicateCoroutinesApi::class) fun main() = runBlocking { val handler = CoroutineExceptionHandler { _, exception -> println("CoroutineExceptionHandler got $exception") diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt index 57fe6382..22380d3a 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt @@ -11,17 +11,21 @@ import kotlinx.coroutines.selects.* suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> { - a.onReceiveOrNull { value -> - if (value == null) - "Channel 'a' is closed" - else + a.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "a -> '$value'" + } else { + "Channel 'a' is closed" + } } - b.onReceiveOrNull { value -> - if (value == null) - "Channel 'b' is closed" - else + b.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "b -> '$value'" + } else { + "Channel 'b' is closed" + } } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt index 464e9b20..68b44564 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt @@ -13,12 +13,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select<Deferred<String>?> { // return next deferred value from this select or null - input.onReceiveOrNull { update -> - update // replaces next value to wait + input.onReceiveCatching { update -> + update.getOrNull() } - current.onAwait { value -> + current.onAwait { value -> send(value) // send value that current deferred has produced - input.receiveOrNull() // and use the next deferred from the input channel + input.receiveCatching().getOrNull() // and use the next deferred from the input channel } } if (next == null) { diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt index 765cd0b9..7e54fb1d 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt @@ -12,7 +12,7 @@ class BasicsGuideTest { @Test fun testExampleBasic01() { test("ExampleBasic01") { kotlinx.coroutines.guide.exampleBasic01.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -20,7 +20,7 @@ class BasicsGuideTest { @Test fun testExampleBasic02() { test("ExampleBasic02") { kotlinx.coroutines.guide.exampleBasic02.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -28,7 +28,7 @@ class BasicsGuideTest { @Test fun testExampleBasic03() { test("ExampleBasic03") { kotlinx.coroutines.guide.exampleBasic03.main() }.verifyLines( - "Hello,", + "Hello", "World!" ) } @@ -36,50 +36,26 @@ class BasicsGuideTest { @Test fun testExampleBasic04() { test("ExampleBasic04") { kotlinx.coroutines.guide.exampleBasic04.main() }.verifyLines( - "Hello,", - "World!" + "Hello", + "World 1", + "World 2", + "Done" ) } @Test fun testExampleBasic05() { test("ExampleBasic05") { kotlinx.coroutines.guide.exampleBasic05.main() }.verifyLines( - "Hello,", - "World!" + "Hello", + "World!", + "Done" ) } @Test fun testExampleBasic06() { - test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.verifyLines( - "Task from coroutine scope", - "Task from runBlocking", - "Task from nested launch", - "Coroutine scope is over" - ) - } - - @Test - fun testExampleBasic07() { - test("ExampleBasic07") { kotlinx.coroutines.guide.exampleBasic07.main() }.verifyLines( - "Hello,", - "World!" - ) - } - - @Test - fun testExampleBasic08() { - test("ExampleBasic08") { kotlinx.coroutines.guide.exampleBasic08.main() }.also { lines -> + test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.also { lines -> check(lines.size == 1 && lines[0] == ".".repeat(100_000)) } } - - @Test - fun testExampleBasic09() { - test("ExampleBasic09") { kotlinx.coroutines.guide.exampleBasic09.main() }.verifyLines( - "I'm sleeping 0 ...", - "I'm sleeping 1 ...", - "I'm sleeping 2 ..." - ) - } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index d6f1c21d..1a84fb94 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -57,7 +57,7 @@ class DispatcherGuideTest { @Test fun testExampleContext06() { test("ExampleContext06") { kotlinx.coroutines.guide.exampleContext06.main() }.verifyLines( - "job1: I run in GlobalScope and execute independently!", + "job1: I run in my own Job and execute independently!", "job2: I am a child of the request coroutine", "job1: I am not affected by cancellation of the request", "main: Who has survived request cancellation?" diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt new file mode 100644 index 00000000..30fbfee2 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly + +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* + +internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name) + +internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher = + ClosedAfterGuideTestDispatcher(nThreads, name) + +internal class PoolThread( + @JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests + target: Runnable, name: String +) : Thread(target, name) { + init { + isDaemon = true + } +} + +private class ClosedAfterGuideTestDispatcher( + private val nThreads: Int, + private val name: String +) : ExecutorCoroutineDispatcher() { + private val threadNo = AtomicInteger() + + override val executor: Executor = + Executors.newScheduledThreadPool(nThreads, object : ThreadFactory { + override fun newThread(target: java.lang.Runnable): Thread { + return PoolThread( + this@ClosedAfterGuideTestDispatcher, + target, + if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet() + ) + } + }) + + override fun dispatch(context: CoroutineContext, block: Runnable) { + executor.execute(wrapTask(block)) + } + + override fun close() { + (executor as ExecutorService).shutdown() + } + + override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" +} diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index 7eda9043..2e61ec6b 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.knit @@ -11,8 +11,6 @@ import kotlinx.knit.test.* import java.util.concurrent.* import kotlin.test.* -fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block) - // helper function to dump exception to stdout for ease of debugging failed tests private inline fun <T> outputException(name: String, block: () -> T): T = try { block() } @@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) { } throw t } -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt index fbd5c0d8..74cc1783 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt @@ -63,11 +63,12 @@ abstract class ChannelLincheckTestBase( } @Operation - fun offer(@Param(name = "value") value: Int): Any = try { - c.offer(value) - } catch (e: NumberedCancellationException) { - e.testResult - } + fun trySend(@Param(name = "value") value: Int): Any = c.trySend(value) + .onSuccess { return true } + .onFailure { + return if (it is NumberedCancellationException) it.testResult + else false + } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -85,11 +86,10 @@ abstract class ChannelLincheckTestBase( } @Operation - fun poll(): Any? = try { - c.poll() - } catch (e: NumberedCancellationException) { - e.testResult - } + fun tryReceive(): Any? = + c.tryReceive() + .onSuccess { return it } + .onFailure { return if (it is NumberedCancellationException) it.testResult else null } // TODO: this operation should be (and can be!) linearizable, but is not // @Operation @@ -131,7 +131,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta private val buffer = ArrayList<Int>() private var closedMessage: String? = null - suspend fun send(x: Int): Any = when (val offerRes = offer(x)) { + suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) { true -> Unit false -> suspendCancellableCoroutine { cont -> senders.add(cont to x) @@ -139,7 +139,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta else -> offerRes } - fun offer(element: Int): Any { + fun trySend(element: Int): Any { if (closedMessage !== null) return closedMessage!! if (capacity == CONFLATED) { if (resumeFirstReceiver(element)) return true @@ -163,11 +163,11 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta return false } - suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont -> + suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont -> receivers.add(cont) } - fun poll(): Any? { + fun tryReceive(): Any? { if (buffer.isNotEmpty()) { val el = buffer.removeAt(0) resumeFirstSender().also { @@ -221,4 +221,4 @@ private fun <T> CancellableContinuation<T>.resume(res: T): Boolean { val token = tryResume(res) ?: return false completeResume(token) return true -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt index 6e350660..f7f59eef 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") package kotlinx.coroutines.lincheck @@ -9,10 +9,15 @@ import kotlinx.coroutines.sync.* import org.jetbrains.kotlinx.lincheck.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* +import org.junit.* class MutexLincheckTest : AbstractLincheckTest() { private val mutex = Mutex() + override fun modelCheckingTest() { + // Ignored via empty body as the only way + } + @Operation fun tryLock() = mutex.tryLock() @@ -29,4 +34,4 @@ class MutexLincheckTest : AbstractLincheckTest() { checkObstructionFreedom() override fun extractState() = mutex.isLocked -}
\ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt index 84ce773c..2b471d7f 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt @@ -16,7 +16,7 @@ abstract class SemaphoreLincheckTestBase(permits: Int) : AbstractLincheckTest() @Operation fun tryAcquire() = semaphore.tryAcquire() - @Operation(promptCancellation = true) + @Operation(promptCancellation = true, allowExtraSuspension = true) suspend fun acquire() = semaphore.acquire() @Operation(handleExceptionsAsResult = [IllegalStateException::class]) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index f31752c8..fe09440f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -101,7 +101,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { firstBarrier.await() secondBarrier.await() blockingTasks.joinAll() - checkPoolThreadsCreated(21..22) + checkPoolThreadsCreated(21 /* blocking tasks + 1 for CPU */..20 + CORES_COUNT) } @Test @@ -122,7 +122,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { barrier.await() blockingTasks.joinAll() // There may be race when multiple CPU threads are trying to lazily created one more - checkPoolThreadsCreated(104..120) + checkPoolThreadsCreated(101..100 + CORES_COUNT) } @Test diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index bfabf5b2..dd969bdd 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -39,17 +39,9 @@ abstract class SchedulerTestBase : TestBase() { ) } - /** - * Asserts that any number of pool worker threads in [range] exists at the time of method invocation - */ - fun checkPoolThreadsExist(range: IntRange) { - val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count() - assertTrue(threads in range, "Expected threads in $range interval, but has $threads") - } - private fun maxSequenceNumber(): Int? { return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } - .map { sequenceNumber(it.name) }.max() + .map { sequenceNumber(it.name) }.maxOrNull() } private fun sequenceNumber(threadName: String): Int { @@ -105,4 +97,4 @@ abstract class SchedulerTestBase : TestBase() { } } } -}
\ No newline at end of file +} |