aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/jvm/test
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/jvm/test')
-rw-r--r--kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt44
-rw-r--r--kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt12
-rw-r--r--kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt14
-rw-r--r--kotlinx-coroutines-core/jvm/test/FieldWalker.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt56
-rw-r--r--kotlinx-coroutines-core/jvm/test/JoinStressTest.kt (renamed from kotlinx-coroutines-core/jvm/test/JoinStrTest.kt)2
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt41
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt41
-rw-r--r--kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt13
-rw-r--r--kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt14
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt31
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt16
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt36
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt16
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt10
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt31
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt14
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt31
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt53
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt38
-rw-r--r--kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt61
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt44
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt7
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt18
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt11
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt18
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt18
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt19
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt17
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt18
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt1
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt20
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt8
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt46
-rw-r--r--kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt51
-rw-r--r--kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt6
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt30
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt9
-rw-r--r--kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt2
-rw-r--r--kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt4
-rw-r--r--kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt12
62 files changed, 690 insertions, 340 deletions
diff --git a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt
index 5ba7acf9..2b4e91c0 100644
--- a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -15,7 +15,7 @@ abstract class AbstractLincheckTest : VerifierState() {
open fun StressOptions.customize(isStressTest: Boolean): StressOptions = this
@Test
- fun modelCheckingTest() = ModelCheckingOptions()
+ open fun modelCheckingTest() = ModelCheckingOptions()
.iterations(if (isStressTest) 100 else 20)
.invocationsPerIteration(if (isStressTest) 10_000 else 1_000)
.commonConfiguration()
@@ -38,4 +38,4 @@ abstract class AbstractLincheckTest : VerifierState() {
.customize(isStressTest)
override fun extractState(): Any = error("Not implemented")
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
index 55c05c55..c7c2c04e 100644
--- a/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/CancelledAwaitStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -52,4 +52,4 @@ class CancelledAwaitStressTest : TestBase() {
private fun keepMe(a: ByteArray) {
// does nothing, makes sure the variable is kept in state-machine
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt
new file mode 100644
index 00000000..dbe9cb37
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import org.junit.Test
+import java.lang.Runnable
+import java.util.concurrent.*
+import kotlin.test.*
+
+class ExecutorAsCoroutineDispatcherDelayTest : TestBase() {
+
+ private var callsToSchedule = 0
+
+ private inner class STPE : ScheduledThreadPoolExecutor(1) {
+ override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
+ if (delay != 0L) ++callsToSchedule
+ return super.schedule(command, delay, unit)
+ }
+ }
+
+ private inner class SES : ScheduledExecutorService by STPE()
+
+ @Test
+ fun testScheduledThreadPool() = runTest {
+ val executor = STPE()
+ withContext(executor.asCoroutineDispatcher()) {
+ delay(100)
+ }
+ executor.shutdown()
+ assertEquals(1, callsToSchedule)
+ }
+
+ @Test
+ fun testScheduledExecutorService() = runTest {
+ val executor = SES()
+ withContext(executor.asCoroutineDispatcher()) {
+ delay(100)
+ }
+ executor.shutdown()
+ assertEquals(1, callsToSchedule)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt
index 15cb83ce..8a7878c9 100644
--- a/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/FailFastOnStartTest.kt
@@ -70,8 +70,18 @@ class FailFastOnStartTest : TestBase() {
val actor = actor<Int>(Dispatchers.Main, start = CoroutineStart.LAZY) { fail() }
actor.send(1)
}
-
+
private fun mainException(e: Throwable): Boolean {
return e is IllegalStateException && e.message?.contains("Module with the Main dispatcher is missing") ?: false
}
+
+ @Test
+ fun testProduceNonChild() = runTest(expected = ::mainException) {
+ produce<Int>(Job() + Dispatchers.Main) { fail() }
+ }
+
+ @Test
+ fun testAsyncNonChild() = runTest(expected = ::mainException) {
+ async<Int>(Job() + Dispatchers.Main) { fail() }
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt
index c9f722a5..04b0ba54 100644
--- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt
@@ -33,7 +33,7 @@ class FailingCoroutinesMachineryTest(
private var caught: Throwable? = null
private val latch = CountDownLatch(1)
- private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t;latch.countDown() }
+ private var exceptionHandler = CoroutineExceptionHandler { _, t -> caught = t; latch.countDown() }
private val lazyOuterDispatcher = lazy { newFixedThreadPoolContext(1, "") }
private object FailingUpdate : ThreadContextElement<Unit> {
@@ -115,14 +115,20 @@ class FailingCoroutinesMachineryTest(
@Test
fun testElement() = runTest {
- launch(NonCancellable + dispatcher.value + exceptionHandler + element) {}
+ // Top-level throwing dispatcher may rethrow an exception right here
+ runCatching {
+ launch(NonCancellable + dispatcher.value + exceptionHandler + element) {}
+ }
checkException()
}
@Test
fun testNestedElement() = runTest {
- launch(NonCancellable + dispatcher.value + exceptionHandler) {
- launch(element) { }
+ // Top-level throwing dispatcher may rethrow an exception right here
+ runCatching {
+ launch(NonCancellable + dispatcher.value + exceptionHandler) {
+ launch(element) { }
+ }
}
checkException()
}
diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
index e8079ebd..c4232d6e 100644
--- a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
+++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -56,7 +56,7 @@ object FieldWalker {
* Reflectively starts to walk through object graph and map to all the reached object to their path
* in from root. Use [showPath] do display a path if needed.
*/
- private fun walkRefs(root: Any?, rootStatics: Boolean): Map<Any, Ref> {
+ private fun walkRefs(root: Any?, rootStatics: Boolean): IdentityHashMap<Any, Ref> {
val visited = IdentityHashMap<Any, Ref>()
if (root == null) return visited
visited[root] = Ref.RootRef
diff --git a/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt
new file mode 100644
index 00000000..6bbfdd1b
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/IntellijIdeaDebuggerEvaluatorCompatibilityTest.kt
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import org.junit.Test
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class IntellijIdeaDebuggerEvaluatorCompatibilityTest {
+
+ /*
+ * This test verifies that our CoroutineScope is accessible to IDEA debugger.
+ *
+ * Consider the following scenario:
+ * ```
+ * runBlocking<Unit> { // this: CoroutineScope
+ * println("runBlocking")
+ * }
+ * ```
+ * user puts breakpoint to `println` line, opens "Evaluate" window
+ * and executes `launch { println("launch") }`. They (obviously) expect it to work, but
+ * it won't: `{}` in `runBlocking` is `SuspendLambda` and `this` is an unused implicit receiver
+ * that is removed by the compiler (because it's unused).
+ *
+ * But we still want to provide consistent user experience for functions with `CoroutineScope` receiver,
+ * for that IDEA debugger tries to retrieve the scope via `kotlin.coroutines.coroutineContext[Job] as? CoroutineScope`
+ * and with this test we're fixing this behaviour.
+ *
+ * Note that this behaviour is not carved in stone: IDEA fallbacks to `kotlin.coroutines.coroutineContext` for the context if necessary.
+ */
+
+ @Test
+ fun testScopeIsAccessible() = runBlocking<Unit> {
+ verify()
+
+ withContext(Job()) {
+ verify()
+ }
+
+ coroutineScope {
+ verify()
+ }
+
+ supervisorScope {
+ verify()
+ }
+
+ }
+
+ private suspend fun verify() {
+ val ctx = coroutineContext
+ assertTrue { ctx.job is CoroutineScope }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt
index 5090e7c0..6d474185 100644
--- a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt
new file mode 100644
index 00000000..8a20e084
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationLeakStressTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.channels.*
+import org.junit.Test
+import kotlin.test.*
+
+class ReusableCancellableContinuationLeakStressTest : TestBase() {
+
+ @Suppress("UnnecessaryVariable")
+ private suspend fun <T : Any> ReceiveChannel<T>.receiveBatch(): T {
+ val r = receive() // DO NOT MERGE LINES, otherwise TCE will kick in
+ return r
+ }
+
+ private val iterations = 100_000 * stressTestMultiplier
+
+ class Leak(val i: Int)
+
+ @Test // Simplified version of #2564
+ fun testReusableContinuationLeak() = runTest {
+ val channel = produce(capacity = 1) { // from the main thread
+ (0 until iterations).forEach {
+ send(Leak(it))
+ }
+ }
+
+ launch(Dispatchers.Default) {
+ repeat (iterations) {
+ val value = channel.receiveBatch()
+ assertEquals(it, value.i)
+ }
+ (channel as Job).join()
+
+ FieldWalker.assertReachableCount(0, coroutineContext.job, false) { it is Leak }
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
index 56f1e283..06839f4a 100644
--- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
@@ -39,7 +39,7 @@ class ReusableCancellableContinuationTest : TestBase() {
repeat(iterations) {
suspender {
- assertTrue(channel.offer(it))
+ assertTrue(channel.trySend(it).isSuccess)
}
}
channel.close()
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt
new file mode 100644
index 00000000..a256815d
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.flow.*
+import org.junit.*
+
+class ReusableContinuationStressTest : TestBase() {
+
+ private val iterations = 1000 * stressTestMultiplierSqrt
+
+ @Test // Originally reported by @denis-bezrukov in #2736
+ fun testDebounceWithStateFlow() = runBlocking<Unit> {
+ withContext(Dispatchers.Default) {
+ repeat(iterations) {
+ launch { // <- load the dispatcher and OS scheduler
+ runStressTestOnce(1, 1)
+ }
+ }
+ }
+ }
+
+ private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope {
+ val stateFlow = MutableStateFlow(0)
+ val emitter = launch {
+ repeat(1000) { i ->
+ stateFlow.emit(i)
+ delay(delay.toLong())
+ }
+ }
+ var last = 0
+ stateFlow.debounce(debounce.toLong()).take(100).collect { i ->
+ if (i - last > 100) {
+ last = i
+ }
+ }
+ emitter.cancel()
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
index e20362ff..de38df6b 100644
--- a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -171,4 +171,15 @@ class RunBlockingTest : TestBase() {
}
rb.hashCode() // unused
}
+
+ @Test
+ fun testCancelledParent() {
+ val job = Job()
+ job.cancel()
+ assertFailsWith<CancellationException> {
+ runBlocking(job) {
+ expectUnreached()
+ }
+ }
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
index e755b17d..49c93c7f 100644
--- a/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/RunInterruptibleTest.kt
@@ -41,7 +41,7 @@ class RunInterruptibleTest : TestBase() {
val job = launch {
runInterruptible(Dispatchers.IO) {
expect(2)
- latch.offer(Unit)
+ latch.trySend(Unit)
try {
Thread.sleep(10_000L)
expectUnreached()
diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
index ca399f53..bd9a185f 100644
--- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
+++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -42,7 +42,7 @@ private const val REAL_PARK_NANOS = 10_000_000L // 10 ms -- park for a little to
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal class VirtualTimeSource(
private val log: PrintStream?
-) : TimeSource {
+) : AbstractTimeSource() {
private val mainThread: Thread = Thread.currentThread()
private var checkpointNanos: Long = System.nanoTime()
@@ -142,7 +142,7 @@ internal class VirtualTimeSource(
}
private fun minParkedTill(): Long =
- threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.min() ?: NOT_PARKED
+ threads.values.map { if (it.permit) NOT_PARKED else it.parkedTill }.minOrNull() ?: NOT_PARKED
@Synchronized
fun shutdown() {
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt
index ae95e694..d3b2ff12 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -78,4 +78,14 @@ class ActorLazyTest : TestBase() {
job.join()
finish(5)
}
-} \ No newline at end of file
+
+ @Test
+ fun testCancelledParent() = runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ actor<Int>(start = CoroutineStart.LAZY) {
+ expectUnreached()
+ }
+ finish(2)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt
index bdca5039..5a2778d5 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -69,11 +69,11 @@ class ActorTest(private val capacity: Int) : TestBase() {
@Test
fun testCloseWithoutCause() = runTest {
val actor = actor<Int>(capacity = capacity) {
- val element = channel.receiveOrNull()
+ val element = channel.receive()
expect(2)
assertEquals(42, element)
- val next = channel.receiveOrNull()
- assertNull(next)
+ val next = channel.receiveCatching()
+ assertNull(next.exceptionOrNull())
expect(3)
}
@@ -88,11 +88,11 @@ class ActorTest(private val capacity: Int) : TestBase() {
@Test
fun testCloseWithCause() = runTest {
val actor = actor<Int>(capacity = capacity) {
- val element = channel.receiveOrNull()
+ val element = channel.receive()
expect(2)
- require(element!! == 42)
+ require(element == 42)
try {
- channel.receiveOrNull()
+ channel.receive()
} catch (e: IOException) {
expect(3)
}
@@ -111,7 +111,7 @@ class ActorTest(private val capacity: Int) : TestBase() {
val job = async {
actor<Int>(capacity = capacity) {
expect(1)
- channel.receiveOrNull()
+ channel.receive()
expectUnreached()
}
}
@@ -173,11 +173,24 @@ class ActorTest(private val capacity: Int) : TestBase() {
fun testCloseFreshActor() = runTest {
for (start in CoroutineStart.values()) {
val job = launch {
- val actor = actor<Int>(start = start) { for (i in channel) {} }
+ val actor = actor<Int>(start = start) {
+ for (i in channel) {
+ }
+ }
actor.close()
}
job.join()
}
}
+
+ @Test
+ fun testCancelledParent() = runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ actor<Int> {
+ expectUnreached()
+ }
+ finish(2)
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
index 2e73b243..8c9777b4 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -67,10 +67,10 @@ class BroadcastChannelMultiReceiveStressTest(
val channel = broadcast.openSubscription()
when (receiverIndex % 5) {
0 -> doReceive(channel, receiverIndex)
- 1 -> doReceiveOrNull(channel, receiverIndex)
+ 1 -> doReceiveCatching(channel, receiverIndex)
2 -> doIterator(channel, receiverIndex)
3 -> doReceiveSelect(channel, receiverIndex)
- 4 -> doReceiveSelectOrNull(channel, receiverIndex)
+ 4 -> doReceiveCatchingSelect(channel, receiverIndex)
}
channel.cancel()
}
@@ -124,9 +124,9 @@ class BroadcastChannelMultiReceiveStressTest(
}
}
- private suspend fun doReceiveOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ private suspend fun doReceiveCatching(channel: ReceiveChannel<Long>, receiverIndex: Int) {
while (true) {
- val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ val stop = doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break)
if (stop) break
}
}
@@ -148,11 +148,11 @@ class BroadcastChannelMultiReceiveStressTest(
}
}
- private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel<Long>, receiverIndex: Int) {
+ private suspend fun doReceiveCatchingSelect(channel: ReceiveChannel<Long>, receiverIndex: Int) {
while (true) {
- val event = select<Long?> { channel.onReceiveOrNull { it } } ?: break
+ val event = select<Long?> { channel.onReceiveCatching { it.getOrNull() } } ?: break
val stop = doReceived(receiverIndex, event)
if (stop) break
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
index 76713aa1..86adfee0 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -15,7 +15,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
// total counters
private var sendCnt = 0
- private var offerFailedCnt = 0
+ private var trySendFailedCnt = 0
private var receivedCnt = 0
private var undeliveredCnt = 0
@@ -23,7 +23,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
private var lastReceived = 0
private var dSendCnt = 0
private var dSendExceptionCnt = 0
- private var dOfferFailedCnt = 0
+ private var dTrySendFailedCnt = 0
private var dReceivedCnt = 0
private val dUndeliveredCnt = AtomicInteger()
@@ -43,30 +43,30 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
joinAll(j1, j2)
// All elements must be either received or undelivered (IN every run)
- if (dSendCnt - dOfferFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
+ if (dSendCnt - dTrySendFailedCnt != dReceivedCnt + dUndeliveredCnt.get()) {
println(" Send: $dSendCnt")
- println("Send Exception: $dSendExceptionCnt")
- println(" Offer failed: $dOfferFailedCnt")
+ println("Send exception: $dSendExceptionCnt")
+ println("trySend failed: $dTrySendFailedCnt")
println(" Received: $dReceivedCnt")
println(" Undelivered: ${dUndeliveredCnt.get()}")
error("Failed")
}
- offerFailedCnt += dOfferFailedCnt
+ trySendFailedCnt += dTrySendFailedCnt
receivedCnt += dReceivedCnt
undeliveredCnt += dUndeliveredCnt.get()
// clear for next run
dSendCnt = 0
dSendExceptionCnt = 0
- dOfferFailedCnt = 0
+ dTrySendFailedCnt = 0
dReceivedCnt = 0
dUndeliveredCnt.set(0)
}
// Stats
- println(" Send: $sendCnt")
- println(" Offer failed: $offerFailedCnt")
- println(" Received: $receivedCnt")
- println(" Undelivered: $undeliveredCnt")
- assertEquals(sendCnt - offerFailedCnt, receivedCnt + undeliveredCnt)
+ println(" Send: $sendCnt")
+ println("trySend failed: $trySendFailedCnt")
+ println(" Received: $receivedCnt")
+ println(" Undelivered: $undeliveredCnt")
+ assertEquals(sendCnt - trySendFailedCnt, receivedCnt + undeliveredCnt)
}
private suspend fun sendOne(channel: Channel<Int>) {
@@ -75,11 +75,11 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
try {
when (Random.nextInt(2)) {
0 -> channel.send(i)
- 1 -> if (!channel.offer(i)) {
- dOfferFailedCnt++
+ 1 -> if (!channel.trySend(i).isSuccess) {
+ dTrySendFailedCnt++
}
}
- } catch(e: Throwable) {
+ } catch (e: Throwable) {
assertTrue(e is CancellationException) // the only exception possible in this test
dSendExceptionCnt++
throw e
@@ -89,7 +89,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
private suspend fun receiveOne(channel: Channel<Int>) {
val received = when (Random.nextInt(3)) {
0 -> channel.receive()
- 1 -> channel.receiveOrNull() ?: error("Cannot be closed yet")
+ 1 -> channel.receiveCatching().getOrElse { error("Cannot be closed yet") }
2 -> select {
channel.onReceive { it }
}
@@ -99,4 +99,4 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() {
dReceivedCnt++
lastReceived = received
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt
index f414c333..a6345cc5 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -60,10 +60,10 @@ class ChannelSendReceiveStressTest(
launch(pool + CoroutineName("receiver$receiverIndex")) {
when (receiverIndex % 5) {
0 -> doReceive(receiverIndex)
- 1 -> doReceiveOrNull(receiverIndex)
+ 1 -> doReceiveCatching(receiverIndex)
2 -> doIterator(receiverIndex)
3 -> doReceiveSelect(receiverIndex)
- 4 -> doReceiveSelectOrNull(receiverIndex)
+ 4 -> doReceiveCatchingSelect(receiverIndex)
}
receiversCompleted.incrementAndGet()
}
@@ -152,9 +152,9 @@ class ChannelSendReceiveStressTest(
}
}
- private suspend fun doReceiveOrNull(receiverIndex: Int) {
+ private suspend fun doReceiveCatching(receiverIndex: Int) {
while (true) {
- doReceived(receiverIndex, channel.receiveOrNull() ?: break)
+ doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break)
}
}
@@ -173,10 +173,10 @@ class ChannelSendReceiveStressTest(
}
}
- private suspend fun doReceiveSelectOrNull(receiverIndex: Int) {
+ private suspend fun doReceiveCatchingSelect(receiverIndex: Int) {
while (true) {
- val event = select<Int?> { channel.onReceiveOrNull { it } } ?: break
+ val event = select<Int?> { channel.onReceiveCatching { it.getOrNull() } } ?: break
doReceived(receiverIndex, event)
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
index 1188329a..12334326 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -68,7 +68,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T
try {
block()
} finally {
- if (!done.offer(true))
+ if (!done.trySend(true).isSuccess)
error(IllegalStateException("failed to offer to done channel"))
}
}
@@ -188,9 +188,9 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T
val receivedData = when (receiveMode) {
1 -> channel.receive()
2 -> select { channel.onReceive { it } }
- 3 -> channel.receiveOrNull() ?: error("Should not be closed")
- 4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } }
- 5 -> channel.receiveOrClosed().value
+ 3 -> channel.receiveCatching().getOrElse { error("Should not be closed") }
+ 4 -> select { channel.onReceiveCatching { it.getOrElse { error("Should not be closed") } } }
+ 5 -> channel.receiveCatching().getOrThrow()
6 -> {
val iterator = channel.iterator()
check(iterator.hasNext()) { "Should not be closed" }
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
index da20f0c5..8512aebc 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelsJvmTest.kt
@@ -11,7 +11,7 @@ import kotlin.test.*
class ChannelsJvmTest : TestBase() {
@Test
- fun testBlocking() {
+ fun testTrySendBlocking() {
val ch = Channel<Int>()
val sum = GlobalScope.async {
var sum = 0
@@ -19,9 +19,36 @@ class ChannelsJvmTest : TestBase() {
sum
}
repeat(10) {
- ch.sendBlocking(it)
+ assertTrue(ch.trySendBlocking(it).isSuccess)
}
ch.close()
assertEquals(45, runBlocking { sum.await() })
}
+
+ @Test
+ fun testTrySendBlockingClosedChannel() {
+ run {
+ val channel = Channel<Unit>().also { it.close() }
+ channel.trySendBlocking(Unit)
+ .onSuccess { expectUnreached() }
+ .onFailure { assertTrue(it is ClosedSendChannelException) }
+ .also { assertTrue { it.isClosed } }
+ }
+
+ run {
+ val channel = Channel<Unit>().also { it.close(TestException()) }
+ channel.trySendBlocking(Unit)
+ .onSuccess { expectUnreached() }
+ .onFailure { assertTrue(it is TestException) }
+ .also { assertTrue { it.isClosed } }
+ }
+
+ run {
+ val channel = Channel<Unit>().also { it.cancel(TestCancellationException()) }
+ channel.trySendBlocking(Unit)
+ .onSuccess { expectUnreached() }
+ .onFailure { assertTrue(it is TestCancellationException) }
+ .also { assertTrue { it.isClosed } }
+ }
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
index eb7be575..2b3c05bc 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedBroadcastChannelNotifyStressTest.kt
@@ -29,7 +29,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
launch(Dispatchers.Default + CoroutineName("Sender$senderId")) {
repeat(nEvents) { i ->
if (i % nSenders == senderId) {
- broadcast.offer(i)
+ broadcast.trySend(i)
sentTotal.incrementAndGet()
yield()
}
@@ -63,7 +63,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
try {
withTimeout(timeLimit) {
senders.forEach { it.join() }
- broadcast.offer(nEvents) // last event to signal receivers termination
+ broadcast.trySend(nEvents) // last event to signal receivers termination
receivers.forEach { it.join() }
}
} catch (e: CancellationException) {
@@ -86,4 +86,4 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
cancel()
value
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
index 316b3785..793d7e44 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt
@@ -1,15 +1,12 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
-import org.junit.After
-import org.junit.Test
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.atomic.AtomicReference
-import kotlin.coroutines.*
+import org.junit.*
+import java.util.concurrent.atomic.*
class ConflatedChannelCloseStressTest : TestBase() {
@@ -37,12 +34,9 @@ class ConflatedChannelCloseStressTest : TestBase() {
var x = senderId
try {
while (isActive) {
- try {
- curChannel.get().offer(x)
+ curChannel.get().trySend(x).onSuccess {
x += nSenders
sent.incrementAndGet()
- } catch (e: ClosedSendChannelException) {
- // ignore
}
}
} finally {
@@ -64,7 +58,9 @@ class ConflatedChannelCloseStressTest : TestBase() {
}
val receiver = async(pool + NonCancellable) {
while (isActive) {
- curChannel.get().receiveOrNull()
+ curChannel.get().receiveCatching().getOrElse {
+ it?.let { throw it }
+ }
received.incrementAndGet()
}
}
@@ -110,4 +106,4 @@ class ConflatedChannelCloseStressTest : TestBase() {
}
class StopException : Exception()
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
index 51789078..fbc28a18 100644
--- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -48,7 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
delayChannel.cancel()
delay(5100)
- assertFailsWith<CancellationException> { delayChannel.poll() }
+ assertFailsWith<CancellationException> { delayChannel.tryReceive().getOrThrow() }
}
}
@@ -112,13 +112,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
var sum = 0
var n = 0
whileSelect {
- this@averageInTimeWindow.onReceiveOrClosed {
+ this@averageInTimeWindow.onReceiveCatching {
if (it.isClosed) {
// Send leftovers and bail out
if (n != 0) send(sum / n.toDouble())
false
} else {
- sum += it.value
+ sum += it.getOrThrow()
++n
true
}
@@ -159,9 +159,9 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase()
}
}
-fun ReceiveChannel<Unit>.checkEmpty() = assertNull(poll())
+fun ReceiveChannel<Unit>.checkEmpty() = assertNull(tryReceive().getOrNull())
fun ReceiveChannel<Unit>.checkNotEmpty() {
- assertNotNull(poll())
- assertNull(poll())
+ assertNotNull(tryReceive().getOrNull())
+ assertNull(tryReceive().getOrNull())
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
index f52f8b5b..2d8c0ebc 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.exceptions
@@ -38,13 +38,6 @@ class StackTraceRecoveryChannelsTest : TestBase() {
}
@Test
- fun testReceiveOrNullFromClosedChannel() = runTest {
- val channel = Channel<Int>()
- channel.close(RecoverableTestException())
- channelReceiveOrNull(channel)
- }
-
- @Test
fun testSendToClosedChannel() = runTest {
val channel = Channel<Int>()
channel.close(RecoverableTestException())
@@ -67,7 +60,6 @@ class StackTraceRecoveryChannelsTest : TestBase() {
}
private suspend fun channelReceive(channel: Channel<Int>) = channelOp { channel.receive() }
- private suspend fun channelReceiveOrNull(channel: Channel<Int>) = channelOp { channel.receiveOrNull() }
private suspend inline fun channelOp(block: () -> Unit) {
try {
@@ -145,25 +137,6 @@ class StackTraceRecoveryChannelsTest : TestBase() {
deferred.await()
}
- // See https://github.com/Kotlin/kotlinx.coroutines/issues/950
- @Test
- fun testCancelledOffer() = runTest {
- expect(1)
- val job = Job()
- val actor = actor<Int>(job, Channel.UNLIMITED) {
- consumeEach {
- expectUnreached() // is cancelled before offer
- }
- }
- job.cancel()
- try {
- actor.offer(1)
- } catch (e: Exception) {
- verifyStackTrace("channels/${name.methodName}", e)
- finish(2)
- }
- }
-
private suspend fun Channel<Int>.sendWithContext(ctx: CoroutineContext) = withContext(ctx) {
sendInChannel()
yield() // TCE
@@ -177,4 +150,4 @@ class StackTraceRecoveryChannelsTest : TestBase() {
private suspend fun Channel<Int>.sendFromScope() = coroutineScope {
sendWithContext(wrapperDispatcher(coroutineContext))
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt
index 70336659..dba738a8 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryCustomExceptionsTest.kt
@@ -5,6 +5,7 @@
package kotlinx.coroutines.exceptions
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import kotlin.test.*
@@ -71,4 +72,56 @@ class StackTraceRecoveryCustomExceptionsTest : TestBase() {
assertEquals("custom", cause.message)
}
}
+
+ class WrongMessageException(token: String) : RuntimeException("Token $token")
+
+ @Test
+ fun testWrongMessageException() = runTest {
+ val result = runCatching {
+ coroutineScope<Unit> {
+ throw WrongMessageException("OK")
+ }
+ }
+ val ex = result.exceptionOrNull() ?: error("Expected to fail")
+ assertTrue(ex is WrongMessageException)
+ assertEquals("Token OK", ex.message)
+ }
+
+ @Test
+ fun testWrongMessageExceptionInChannel() = runTest {
+ val result = produce<Unit>(SupervisorJob() + Dispatchers.Unconfined) {
+ throw WrongMessageException("OK")
+ }
+ val ex = runCatching {
+ @Suppress("ControlFlowWithEmptyBody")
+ for (unit in result) {
+ // Iterator has a special code path
+ }
+ }.exceptionOrNull() ?: error("Expected to fail")
+ assertTrue(ex is WrongMessageException)
+ assertEquals("Token OK", ex.message)
+ }
+
+ class CopyableWithCustomMessage(
+ message: String?,
+ cause: Throwable? = null
+ ) : RuntimeException(message, cause),
+ CopyableThrowable<CopyableWithCustomMessage> {
+
+ override fun createCopy(): CopyableWithCustomMessage {
+ return CopyableWithCustomMessage("Recovered: [$message]", cause)
+ }
+ }
+
+ @Test
+ fun testCustomCopyableMessage() = runTest {
+ val result = runCatching {
+ coroutineScope<Unit> {
+ throw CopyableWithCustomMessage("OK")
+ }
+ }
+ val ex = result.exceptionOrNull() ?: error("Expected to fail")
+ assertTrue(ex is CopyableWithCustomMessage)
+ assertEquals("Recovered: [OK]", ex.message)
+ }
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt
index bea18a43..a85bb7a2 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedScopesTest.kt
@@ -86,7 +86,6 @@ class StackTraceRecoveryNestedScopesTest : TestBase() {
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callWithTimeout\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:37)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$callCoroutineScope\$2.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:43)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:68)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest.verifyAwait(StackTraceRecoveryNestedScopesTest.kt:76)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryNestedScopesTest\$testAwaitNestedScopes\$1.invokeSuspend(StackTraceRecoveryNestedScopesTest.kt:71)\n" +
"Caused by: kotlinx.coroutines.RecoverableTestException\n" +
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt
index 5073b7fd..02607c03 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryNestedTest.kt
@@ -58,7 +58,7 @@ class StackTraceRecoveryNestedTest : TestBase() {
try {
rootAsync.awaitRootLevel()
} catch (e: RecoverableTestException) {
- e.verifyException("await\$suspendImpl", "awaitRootLevel")
+ e.verifyException("awaitRootLevel")
finish(8)
}
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt
index 290420e4..0d7648c5 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoverySelectTest.kt
@@ -45,9 +45,9 @@ class StackTraceRecoverySelectTest : TestBase() {
private suspend fun doSelectAwait(deferred: Deferred<Unit>): Int {
return select {
deferred.onAwait {
- yield() // Hide the stackstrace
+ yield() // Hide the frame
42
}
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
index dbbd77c4..0a8b6530 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt
@@ -34,14 +34,13 @@ class StackTraceRecoveryTest : TestBase() {
val deferred = createDeferred(3)
val traces = listOf(
"java.util.concurrent.ExecutionException\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:99)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:49)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:44)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:17)\n",
"Caused by: java.util.concurrent.ExecutionException\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$1\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testAsync\$1\$createDeferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:21)\n" +
"\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n"
)
nestedMethod(deferred, *traces.toTypedArray())
@@ -59,7 +58,6 @@ class StackTraceRecoveryTest : TestBase() {
"java.util.concurrent.ExecutionException\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:44)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.oneMoreNestedMethod(StackTraceRecoveryTest.kt:81)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.nestedMethod(StackTraceRecoveryTest.kt:75)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCompletedAsync\$1.invokeSuspend(StackTraceRecoveryTest.kt:71)",
@@ -94,7 +92,6 @@ class StackTraceRecoveryTest : TestBase() {
"kotlinx.coroutines.RecoverableTestException\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testWithContext\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.outerMethod(StackTraceRecoveryTest.kt:150)\n" +
@@ -132,7 +129,6 @@ class StackTraceRecoveryTest : TestBase() {
"kotlinx.coroutines.RecoverableTestException\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testCoroutineScope\$1\$deferred\$1.invokeSuspend(StackTraceRecoveryTest.kt:143)\n" +
"\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.innerMethod(StackTraceRecoveryTest.kt:158)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2\$1.invokeSuspend(StackTraceRecoveryTest.kt:193)\n" +
"\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$outerScopedMethod\$2.invokeSuspend(StackTraceRecoveryTest.kt:151)\n" +
@@ -228,13 +224,13 @@ class StackTraceRecoveryTest : TestBase() {
val e = exception
assertNotNull(e)
verifyStackTrace(e, "kotlinx.coroutines.RecoverableTestException\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" +
- "\t(Coroutine boundary)\n" +
- "\tat kotlinx.coroutines.DeferredCoroutine.await\$suspendImpl(Builders.common.kt:99)\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" +
- "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" +
- "Caused by: kotlinx.coroutines.RecoverableTestException")
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.throws(StackTraceRecoveryTest.kt:280)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.access\$throws(StackTraceRecoveryTest.kt:20)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$throws\$1.invokeSuspend(StackTraceRecoveryTest.kt)\n" +
+ "\t(Coroutine boundary)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest.awaiter(StackTraceRecoveryTest.kt:285)\n" +
+ "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testNonDispatchedRecovery\$await\$1.invokeSuspend(StackTraceRecoveryTest.kt:291)\n" +
+ "Caused by: kotlinx.coroutines.RecoverableTestException")
}
private class Callback(val cont: CancellableContinuation<*>)
@@ -261,22 +257,8 @@ class StackTraceRecoveryTest : TestBase() {
private suspend fun awaitCallback(channel: Channel<Callback>) {
suspendCancellableCoroutine<Unit> { cont ->
- channel.offer(Callback(cont))
+ channel.trySend(Callback(cont))
}
yield() // nop to make sure it is not a tail call
}
-
- @Test
- fun testWrongMessageException() = runTest {
- val result = runCatching {
- coroutineScope<Unit> {
- throw WrongMessageException("OK")
- }
- }
- val ex = result.exceptionOrNull() ?: error("Expected to fail")
- assertTrue(ex is WrongMessageException)
- assertEquals("Token OK", ex.message)
- }
-
- public class WrongMessageException(token: String) : RuntimeException("Token $token")
}
diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt
index 6034fccb..edce175d 100644
--- a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt
+++ b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.exceptions
@@ -15,8 +15,8 @@ class SuppressionTests : TestBase() {
@Test
fun testNotificationsWithException() = runTest {
expect(1)
- val coroutineContext = kotlin.coroutines.coroutineContext // workaround for KT-22984
- val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
+ val coroutineContext = kotlin.coroutines.coroutineContext + NonCancellable // workaround for KT-22984
+ val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) {
override fun onStart() {
expect(3)
}
@@ -82,4 +82,4 @@ class SuppressionTests : TestBase() {
assertTrue(e.cause!!.suppressed.isEmpty())
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
index e3db2626..f1be284c 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/CallbackFlowTest.kt
@@ -36,7 +36,7 @@ class CallbackFlowTest : TestBase() {
fun testThrowingConsumer() = runTest {
var i = 0
val api = CallbackApi {
- runCatching { it.offer(++i) }
+ it.trySend(++i)
}
val flow = callbackFlow<Int> {
@@ -77,13 +77,13 @@ class CallbackFlowTest : TestBase() {
var i = 0
val api = CallbackApi {
if (i < 5) {
- it.offer(++i)
+ it.trySend(++i)
} else {
it.close(RuntimeException())
}
}
- val flow = callbackFlow<Int>() {
+ val flow = callbackFlow<Int> {
api.start(channel)
awaitClose {
api.stop()
diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt
new file mode 100644
index 00000000..98240fc9
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/SharingReferenceTest.kt
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import org.junit.*
+
+/**
+ * Tests that shared flows keep strong reference to their source flows.
+ * See https://github.com/Kotlin/kotlinx.coroutines/issues/2557
+ */
+@OptIn(DelicateCoroutinesApi::class)
+class SharingReferenceTest : TestBase() {
+ private val token = object {}
+
+ /*
+ * Single-threaded executor that we are using to ensure that the flow being sharing actually
+ * suspended (spilled its locals, attached to parent), so we can verify reachability.
+ * Without that, it's possible to have a situation where target flow is still
+ * being strongly referenced (by its dispatcher), but the test already tries to test reachability and fails.
+ */
+ @get:Rule
+ val executor = ExecutorRule(1)
+
+ private val weakEmitter = flow {
+ emit(null)
+ // suspend forever without keeping a strong reference to continuation -- this is a model of
+ // a callback API that does not keep a strong reference it is listeners, but works
+ suspendCancellableCoroutine<Unit> { }
+ // using the token here to make it easily traceable
+ emit(token)
+ }
+
+ @Test
+ fun testShareInReference() {
+ val flow = weakEmitter.shareIn(ContextScope(executor), SharingStarted.Eagerly, 0)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ @Test
+ fun testStateInReference() {
+ val flow = weakEmitter.stateIn(ContextScope(executor), SharingStarted.Eagerly, null)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ @Test
+ fun testStateInSuspendingReference() = runTest {
+ val flow = weakEmitter.stateIn(GlobalScope)
+ linearize()
+ FieldWalker.assertReachableCount(1, flow) { it === token }
+ }
+
+ private fun linearize() {
+ runBlocking(executor) { }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
index dc3cd43c..e55eaad1 100644
--- a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt
@@ -62,7 +62,7 @@ class StateFlowStressTest : TestBase() {
for (second in 1..nSeconds) {
delay(1000)
val cs = collected.map { it.sum() }
- println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}")
+ println("$second: emitted=${emitted.sum()}, collected=${cs.minOrNull()}..${cs.maxOrNull()}")
}
emitters.cancelAndJoin()
collectors.cancelAndJoin()
@@ -77,4 +77,4 @@ class StateFlowStressTest : TestBase() {
@Test
fun testTenEmittersAndCollectors() = stress(10, 10)
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt
new file mode 100644
index 00000000..660ed0aa
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowUpdateStressTest.kt
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import org.junit.*
+import kotlin.test.*
+import kotlin.test.Test
+
+class StateFlowUpdateStressTest : TestBase() {
+ private val iterations = 1_000_000 * stressTestMultiplier
+
+ @get:Rule
+ public val executor = ExecutorRule(2)
+
+ @Test
+ fun testUpdate() = doTest { update { it + 1 } }
+
+ @Test
+ fun testUpdateAndGet() = doTest { updateAndGet { it + 1 } }
+
+ @Test
+ fun testGetAndUpdate() = doTest { getAndUpdate { it + 1 } }
+
+ private fun doTest(increment: MutableStateFlow<Int>.() -> Unit) = runTest {
+ val flow = MutableStateFlow(0)
+ val j1 = launch(Dispatchers.Default) {
+ repeat(iterations / 2) {
+ flow.increment()
+ }
+ }
+
+ val j2 = launch(Dispatchers.Default) {
+ repeat(iterations / 2) {
+ flow.increment()
+ }
+ }
+
+ joinAll(j1, j2)
+ assertEquals(iterations, flow.value)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt
index f04b100a..529f8817 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt
@@ -7,11 +7,10 @@ package kotlinx.coroutines.guide.exampleBasic01
import kotlinx.coroutines.*
-fun main() {
- GlobalScope.launch { // launch a new coroutine in background and continue
+fun main() = runBlocking { // this: CoroutineScope
+ launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
- println("Hello,") // main thread continues while coroutine is delayed
- Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
+ println("Hello") // main coroutine continues while a previous one is delayed
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt
index bfece26b..6bf2af4c 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-02.kt
@@ -7,13 +7,13 @@ package kotlinx.coroutines.guide.exampleBasic02
import kotlinx.coroutines.*
-fun main() {
- GlobalScope.launch { // launch a new coroutine in background and continue
- delay(1000L)
- println("World!")
- }
- println("Hello,") // main thread continues here immediately
- runBlocking { // but this expression blocks the main thread
- delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
- }
+fun main() = runBlocking { // this: CoroutineScope
+ launch { doWorld() }
+ println("Hello")
+}
+
+// this is your first suspending function
+suspend fun doWorld() {
+ delay(1000L)
+ println("World!")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
index 8541f604..67b6894a 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
@@ -7,11 +7,14 @@ package kotlinx.coroutines.guide.exampleBasic03
import kotlinx.coroutines.*
-fun main() = runBlocking<Unit> { // start main coroutine
- GlobalScope.launch { // launch a new coroutine in background and continue
+fun main() = runBlocking {
+ doWorld()
+}
+
+suspend fun doWorld() = coroutineScope { // this: CoroutineScope
+ launch {
delay(1000L)
println("World!")
}
- println("Hello,") // main coroutine continues here immediately
- delay(2000L) // delaying for 2 seconds to keep JVM alive
+ println("Hello")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt
index 69f82771..efac7085 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-04.kt
@@ -7,11 +7,21 @@ package kotlinx.coroutines.guide.exampleBasic04
import kotlinx.coroutines.*
+// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
- val job = GlobalScope.launch { // launch a new coroutine and keep a reference to its Job
+ doWorld()
+ println("Done")
+}
+
+// Concurrently executes both sections
+suspend fun doWorld() = coroutineScope { // this: CoroutineScope
+ launch {
+ delay(2000L)
+ println("World 2")
+ }
+ launch {
delay(1000L)
- println("World!")
+ println("World 1")
}
- println("Hello,")
- job.join() // wait until child coroutine completes
+ println("Hello")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt
index 9d530b5f..193f2cc3 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-05.kt
@@ -7,10 +7,12 @@ package kotlinx.coroutines.guide.exampleBasic05
import kotlinx.coroutines.*
-fun main() = runBlocking { // this: CoroutineScope
- launch { // launch a new coroutine in the scope of runBlocking
+fun main() = runBlocking {
+ val job = launch { // launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
- println("Hello,")
+ println("Hello")
+ job.join() // wait until child coroutine completes
+ println("Done")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt
index b53d3b89..24b890a0 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-06.kt
@@ -7,21 +7,11 @@ package kotlinx.coroutines.guide.exampleBasic06
import kotlinx.coroutines.*
-fun main() = runBlocking { // this: CoroutineScope
- launch {
- delay(200L)
- println("Task from runBlocking")
- }
-
- coroutineScope { // Creates a coroutine scope
+fun main() = runBlocking {
+ repeat(100_000) { // launch a lot of coroutines
launch {
- delay(500L)
- println("Task from nested launch")
+ delay(5000L)
+ print(".")
}
-
- delay(100L)
- println("Task from coroutine scope") // This line will be printed before the nested launch
}
-
- println("Coroutine scope is over") // This line is not printed until the nested launch completes
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt
deleted file mode 100644
index cd854ce8..00000000
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
-package kotlinx.coroutines.guide.exampleBasic07
-
-import kotlinx.coroutines.*
-
-fun main() = runBlocking {
- launch { doWorld() }
- println("Hello,")
-}
-
-// this is your first suspending function
-suspend fun doWorld() {
- delay(1000L)
- println("World!")
-}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt
deleted file mode 100644
index 0a346e0b..00000000
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-08.kt
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
-package kotlinx.coroutines.guide.exampleBasic08
-
-import kotlinx.coroutines.*
-
-fun main() = runBlocking {
- repeat(100_000) { // launch a lot of coroutines
- launch {
- delay(5000L)
- print(".")
- }
- }
-}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt
deleted file mode 100644
index c9783ee5..00000000
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-09.kt
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-// This file was automatically generated from coroutines-basics.md by Knit tool. Do not edit.
-package kotlinx.coroutines.guide.exampleBasic09
-
-import kotlinx.coroutines.*
-
-fun main() = runBlocking {
- GlobalScope.launch {
- repeat(1000) { i ->
- println("I'm sleeping $i ...")
- delay(500L)
- }
- }
- delay(1300L) // just quit after delay
-}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
index 312dc72b..35536a7d 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
@@ -23,10 +23,12 @@ fun main() {
println("Completed in $time ms")
}
+@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne()
}
+@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
doSomethingUsefulTwo()
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
index e23eaf25..c6ad4516 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
@@ -10,9 +10,9 @@ import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
- // it spawns two other jobs, one with GlobalScope
- GlobalScope.launch {
- println("job1: I run in GlobalScope and execute independently!")
+ // it spawns two other jobs
+ launch(Job()) {
+ println("job1: I run in my own Job and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt
index e08ddd08..24cbabe0 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-01.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions01
import kotlinx.coroutines.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val job = GlobalScope.launch { // root coroutine with launch
println("Throwing exception from launch")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
index 67fdaa71..c3ab68a5 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions02
import kotlinx.coroutines.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
index 9c9b43d2..b966c1ea 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.guide.exampleExceptions04
import kotlinx.coroutines.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt
index 04f9385f..5f1f3d89 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-05.kt
@@ -10,6 +10,7 @@ import kotlinx.coroutines.exceptions.*
import kotlinx.coroutines.*
import java.io.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
index 5a5b276b..bc9f77b9 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
@@ -8,6 +8,7 @@ package kotlinx.coroutines.guide.exampleExceptions06
import kotlinx.coroutines.*
import java.io.*
+@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt
index 57fe6382..22380d3a 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt
@@ -11,17 +11,21 @@ import kotlinx.coroutines.selects.*
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
- a.onReceiveOrNull { value ->
- if (value == null)
- "Channel 'a' is closed"
- else
+ a.onReceiveCatching { it ->
+ val value = it.getOrNull()
+ if (value != null) {
"a -> '$value'"
+ } else {
+ "Channel 'a' is closed"
+ }
}
- b.onReceiveOrNull { value ->
- if (value == null)
- "Channel 'b' is closed"
- else
+ b.onReceiveCatching { it ->
+ val value = it.getOrNull()
+ if (value != null) {
"b -> '$value'"
+ } else {
+ "Channel 'b' is closed"
+ }
}
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt
index 464e9b20..68b44564 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt
@@ -13,12 +13,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) =
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
- input.onReceiveOrNull { update ->
- update // replaces next value to wait
+ input.onReceiveCatching { update ->
+ update.getOrNull()
}
- current.onAwait { value ->
+ current.onAwait { value ->
send(value) // send value that current deferred has produced
- input.receiveOrNull() // and use the next deferred from the input channel
+ input.receiveCatching().getOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt
index 765cd0b9..7e54fb1d 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/test/BasicsGuideTest.kt
@@ -12,7 +12,7 @@ class BasicsGuideTest {
@Test
fun testExampleBasic01() {
test("ExampleBasic01") { kotlinx.coroutines.guide.exampleBasic01.main() }.verifyLines(
- "Hello,",
+ "Hello",
"World!"
)
}
@@ -20,7 +20,7 @@ class BasicsGuideTest {
@Test
fun testExampleBasic02() {
test("ExampleBasic02") { kotlinx.coroutines.guide.exampleBasic02.main() }.verifyLines(
- "Hello,",
+ "Hello",
"World!"
)
}
@@ -28,7 +28,7 @@ class BasicsGuideTest {
@Test
fun testExampleBasic03() {
test("ExampleBasic03") { kotlinx.coroutines.guide.exampleBasic03.main() }.verifyLines(
- "Hello,",
+ "Hello",
"World!"
)
}
@@ -36,50 +36,26 @@ class BasicsGuideTest {
@Test
fun testExampleBasic04() {
test("ExampleBasic04") { kotlinx.coroutines.guide.exampleBasic04.main() }.verifyLines(
- "Hello,",
- "World!"
+ "Hello",
+ "World 1",
+ "World 2",
+ "Done"
)
}
@Test
fun testExampleBasic05() {
test("ExampleBasic05") { kotlinx.coroutines.guide.exampleBasic05.main() }.verifyLines(
- "Hello,",
- "World!"
+ "Hello",
+ "World!",
+ "Done"
)
}
@Test
fun testExampleBasic06() {
- test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.verifyLines(
- "Task from coroutine scope",
- "Task from runBlocking",
- "Task from nested launch",
- "Coroutine scope is over"
- )
- }
-
- @Test
- fun testExampleBasic07() {
- test("ExampleBasic07") { kotlinx.coroutines.guide.exampleBasic07.main() }.verifyLines(
- "Hello,",
- "World!"
- )
- }
-
- @Test
- fun testExampleBasic08() {
- test("ExampleBasic08") { kotlinx.coroutines.guide.exampleBasic08.main() }.also { lines ->
+ test("ExampleBasic06") { kotlinx.coroutines.guide.exampleBasic06.main() }.also { lines ->
check(lines.size == 1 && lines[0] == ".".repeat(100_000))
}
}
-
- @Test
- fun testExampleBasic09() {
- test("ExampleBasic09") { kotlinx.coroutines.guide.exampleBasic09.main() }.verifyLines(
- "I'm sleeping 0 ...",
- "I'm sleeping 1 ...",
- "I'm sleeping 2 ..."
- )
- }
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt
index d6f1c21d..1a84fb94 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt
@@ -57,7 +57,7 @@ class DispatcherGuideTest {
@Test
fun testExampleContext06() {
test("ExampleContext06") { kotlinx.coroutines.guide.exampleContext06.main() }.verifyLines(
- "job1: I run in GlobalScope and execute independently!",
+ "job1: I run in my own Job and execute independently!",
"job2: I am a child of the request coroutine",
"job1: I am not affected by cancellation of the request",
"main: Who has survived request cancellation?"
diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt
new file mode 100644
index 00000000..30fbfee2
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly
+
+import java.util.concurrent.*
+import java.util.concurrent.atomic.*
+import kotlin.coroutines.*
+
+internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name)
+
+internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher =
+ ClosedAfterGuideTestDispatcher(nThreads, name)
+
+internal class PoolThread(
+ @JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests
+ target: Runnable, name: String
+) : Thread(target, name) {
+ init {
+ isDaemon = true
+ }
+}
+
+private class ClosedAfterGuideTestDispatcher(
+ private val nThreads: Int,
+ private val name: String
+) : ExecutorCoroutineDispatcher() {
+ private val threadNo = AtomicInteger()
+
+ override val executor: Executor =
+ Executors.newScheduledThreadPool(nThreads, object : ThreadFactory {
+ override fun newThread(target: java.lang.Runnable): Thread {
+ return PoolThread(
+ this@ClosedAfterGuideTestDispatcher,
+ target,
+ if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()
+ )
+ }
+ })
+
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ executor.execute(wrapTask(block))
+ }
+
+ override fun close() {
+ (executor as ExecutorService).shutdown()
+ }
+
+ override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
+}
diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
index 7eda9043..2e61ec6b 100644
--- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
+++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.knit
@@ -11,8 +11,6 @@ import kotlinx.knit.test.*
import java.util.concurrent.*
import kotlin.test.*
-fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block)
-
// helper function to dump exception to stdout for ease of debugging failed tests
private inline fun <T> outputException(name: String, block: () -> T): T =
try { block() }
@@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) {
}
throw t
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
index fbd5c0d8..74cc1783 100644
--- a/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/ChannelsLincheckTest.kt
@@ -63,11 +63,12 @@ abstract class ChannelLincheckTestBase(
}
@Operation
- fun offer(@Param(name = "value") value: Int): Any = try {
- c.offer(value)
- } catch (e: NumberedCancellationException) {
- e.testResult
- }
+ fun trySend(@Param(name = "value") value: Int): Any = c.trySend(value)
+ .onSuccess { return true }
+ .onFailure {
+ return if (it is NumberedCancellationException) it.testResult
+ else false
+ }
// TODO: this operation should be (and can be!) linearizable, but is not
// @Operation
@@ -85,11 +86,10 @@ abstract class ChannelLincheckTestBase(
}
@Operation
- fun poll(): Any? = try {
- c.poll()
- } catch (e: NumberedCancellationException) {
- e.testResult
- }
+ fun tryReceive(): Any? =
+ c.tryReceive()
+ .onSuccess { return it }
+ .onFailure { return if (it is NumberedCancellationException) it.testResult else null }
// TODO: this operation should be (and can be!) linearizable, but is not
// @Operation
@@ -131,7 +131,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
private val buffer = ArrayList<Int>()
private var closedMessage: String? = null
- suspend fun send(x: Int): Any = when (val offerRes = offer(x)) {
+ suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) {
true -> Unit
false -> suspendCancellableCoroutine { cont ->
senders.add(cont to x)
@@ -139,7 +139,7 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
else -> offerRes
}
- fun offer(element: Int): Any {
+ fun trySend(element: Int): Any {
if (closedMessage !== null) return closedMessage!!
if (capacity == CONFLATED) {
if (resumeFirstReceiver(element)) return true
@@ -163,11 +163,11 @@ abstract class SequentialIntChannelBase(private val capacity: Int) : VerifierSta
return false
}
- suspend fun receive(): Any = poll() ?: suspendCancellableCoroutine { cont ->
+ suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont ->
receivers.add(cont)
}
- fun poll(): Any? {
+ fun tryReceive(): Any? {
if (buffer.isNotEmpty()) {
val el = buffer.removeAt(0)
resumeFirstSender().also {
@@ -221,4 +221,4 @@ private fun <T> CancellableContinuation<T>.resume(res: T): Boolean {
val token = tryResume(res) ?: return false
completeResume(token)
return true
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt
index 6e350660..f7f59eef 100644
--- a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
package kotlinx.coroutines.lincheck
@@ -9,10 +9,15 @@ import kotlinx.coroutines.sync.*
import org.jetbrains.kotlinx.lincheck.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
+import org.junit.*
class MutexLincheckTest : AbstractLincheckTest() {
private val mutex = Mutex()
+ override fun modelCheckingTest() {
+ // Ignored via empty body as the only way
+ }
+
@Operation
fun tryLock() = mutex.tryLock()
@@ -29,4 +34,4 @@ class MutexLincheckTest : AbstractLincheckTest() {
checkObstructionFreedom()
override fun extractState() = mutex.isLocked
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
index 84ce773c..2b471d7f 100644
--- a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
@@ -16,7 +16,7 @@ abstract class SemaphoreLincheckTestBase(permits: Int) : AbstractLincheckTest()
@Operation
fun tryAcquire() = semaphore.tryAcquire()
- @Operation(promptCancellation = true)
+ @Operation(promptCancellation = true, allowExtraSuspension = true)
suspend fun acquire() = semaphore.acquire()
@Operation(handleExceptionsAsResult = [IllegalStateException::class])
diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt
index f31752c8..fe09440f 100644
--- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt
@@ -101,7 +101,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
firstBarrier.await()
secondBarrier.await()
blockingTasks.joinAll()
- checkPoolThreadsCreated(21..22)
+ checkPoolThreadsCreated(21 /* blocking tasks + 1 for CPU */..20 + CORES_COUNT)
}
@Test
@@ -122,7 +122,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
barrier.await()
blockingTasks.joinAll()
// There may be race when multiple CPU threads are trying to lazily created one more
- checkPoolThreadsCreated(104..120)
+ checkPoolThreadsCreated(101..100 + CORES_COUNT)
}
@Test
diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt
index bfabf5b2..dd969bdd 100644
--- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt
+++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt
@@ -39,17 +39,9 @@ abstract class SchedulerTestBase : TestBase() {
)
}
- /**
- * Asserts that any number of pool worker threads in [range] exists at the time of method invocation
- */
- fun checkPoolThreadsExist(range: IntRange) {
- val threads = Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }.count()
- assertTrue(threads in range, "Expected threads in $range interval, but has $threads")
- }
-
private fun maxSequenceNumber(): Int? {
return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker }
- .map { sequenceNumber(it.name) }.max()
+ .map { sequenceNumber(it.name) }.maxOrNull()
}
private fun sequenceNumber(threadName: String): Int {
@@ -105,4 +97,4 @@ abstract class SchedulerTestBase : TestBase() {
}
}
}
-} \ No newline at end of file
+}