aboutsummaryrefslogtreecommitdiffstats
path: root/kotlinx-coroutines-core/common/test
diff options
context:
space:
mode:
Diffstat (limited to 'kotlinx-coroutines-core/common/test')
-rw-r--r--kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt8
-rw-r--r--kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt85
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt16
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt34
-rw-r--r--kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt114
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt26
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt146
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt135
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt36
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt6
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt24
-rw-r--r--kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt14
-rw-r--r--kotlinx-coroutines-core/common/test/channels/ProduceTest.kt6
-rw-r--r--kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt28
-rw-r--r--kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt15
-rw-r--r--kotlinx-coroutines-core/common/test/flow/VirtualTime.kt2
-rw-r--r--kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt139
-rw-r--r--kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt22
-rw-r--r--kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt4
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt23
-rw-r--r--kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt7
-rw-r--r--kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt4
-rw-r--r--kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt6
-rw-r--r--kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt26
-rw-r--r--kotlinx-coroutines-core/common/test/flow/terminal/LastTest.kt45
-rw-r--r--kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt28
-rw-r--r--kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt6
-rw-r--r--kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt32
28 files changed, 498 insertions, 539 deletions
diff --git a/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt b/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt
index ce20837e..ebe88ce1 100644
--- a/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt
+++ b/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -13,7 +13,7 @@ class AbstractCoroutineTest : TestBase() {
fun testNotifications() = runTest {
expect(1)
val coroutineContext = coroutineContext // workaround for KT-22984
- val coroutine = object : AbstractCoroutine<String>(coroutineContext, false) {
+ val coroutine = object : AbstractCoroutine<String>(coroutineContext, true, false) {
override fun onStart() {
expect(3)
}
@@ -53,7 +53,7 @@ class AbstractCoroutineTest : TestBase() {
fun testNotificationsWithException() = runTest {
expect(1)
val coroutineContext = coroutineContext // workaround for KT-22984
- val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, false) {
+ val coroutine = object : AbstractCoroutine<String>(coroutineContext + NonCancellable, true, false) {
override fun onStart() {
expect(3)
}
@@ -91,4 +91,4 @@ class AbstractCoroutineTest : TestBase() {
coroutine.resumeWithException(TestException2())
finish(10)
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt b/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt
new file mode 100644
index 00000000..749bbfc9
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines
+
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.internal.*
+import kotlin.test.*
+
+class CancelledParentAttachTest : TestBase() {
+
+ @Test
+ fun testAsync() = CoroutineStart.values().forEach(::testAsyncCancelledParent)
+
+ private fun testAsyncCancelledParent(start: CoroutineStart) =
+ runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ val d = async<Int>(start = start) { 42 }
+ expect(2)
+ d.invokeOnCompletion {
+ finish(3)
+ reset()
+ }
+ }
+
+ @Test
+ fun testLaunch() = CoroutineStart.values().forEach(::testLaunchCancelledParent)
+
+ private fun testLaunchCancelledParent(start: CoroutineStart) =
+ runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ val d = launch(start = start) { }
+ expect(2)
+ d.invokeOnCompletion {
+ finish(3)
+ reset()
+ }
+ }
+
+ @Test
+ fun testProduce() =
+ runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ val d = produce<Int> { }
+ expect(2)
+ (d as Job).invokeOnCompletion {
+ finish(3)
+ reset()
+ }
+ }
+
+ @Test
+ fun testBroadcast() = CoroutineStart.values().forEach(::testBroadcastCancelledParent)
+
+ private fun testBroadcastCancelledParent(start: CoroutineStart) =
+ runTest({ it is CancellationException }) {
+ cancel()
+ expect(1)
+ val bc = broadcast<Int>(start = start) {}
+ expect(2)
+ (bc as Job).invokeOnCompletion {
+ finish(3)
+ reset()
+ }
+ }
+
+ @Test
+ fun testScopes() {
+ testScope { coroutineScope { } }
+ testScope { supervisorScope { } }
+ testScope { flowScope { } }
+ testScope { withTimeout(Long.MAX_VALUE) { } }
+ testScope { withContext(Job()) { } }
+ testScope { withContext(CoroutineName("")) { } }
+ }
+
+ private inline fun testScope(crossinline block: suspend () -> Unit) = runTest({ it is CancellationException }) {
+ cancel()
+ block()
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt
index a7084296..2d71cc94 100644
--- a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -46,7 +46,7 @@ class ArrayBroadcastChannelTest : TestBase() {
assertEquals(2, first.receive()) // suspends
assertFalse(first.isClosedForReceive)
expect(10)
- assertNull(first.receiveOrNull()) // suspends
+ assertTrue(first.receiveCatching().isClosed) // suspends
assertTrue(first.isClosedForReceive)
expect(14)
}
@@ -62,7 +62,7 @@ class ArrayBroadcastChannelTest : TestBase() {
assertEquals(2, second.receive()) // suspends
assertFalse(second.isClosedForReceive)
expect(11)
- assertNull(second.receiveOrNull()) // suspends
+ assertNull(second.receiveCatching().getOrNull()) // suspends
assertTrue(second.isClosedForReceive)
expect(15)
}
@@ -116,9 +116,9 @@ class ArrayBroadcastChannelTest : TestBase() {
expect(6)
assertFalse(sub.isClosedForReceive)
for (x in 1..3)
- assertEquals(x, sub.receiveOrNull())
+ assertEquals(x, sub.receiveCatching().getOrNull())
// and receive close signal
- assertNull(sub.receiveOrNull())
+ assertNull(sub.receiveCatching().getOrNull())
assertTrue(sub.isClosedForReceive)
finish(7)
}
@@ -153,7 +153,7 @@ class ArrayBroadcastChannelTest : TestBase() {
// make sure all of them are consumed
check(!sub.isClosedForReceive)
for (x in 1..5) check(sub.receive() == x)
- check(sub.receiveOrNull() == null)
+ check(sub.receiveCatching().getOrNull() == null)
check(sub.isClosedForReceive)
}
@@ -196,7 +196,7 @@ class ArrayBroadcastChannelTest : TestBase() {
val channel = BroadcastChannel<Int>(1)
val subscription = channel.openSubscription()
subscription.cancel(TestCancellationException())
- subscription.receiveOrNull()
+ subscription.receive()
}
@Test
@@ -208,6 +208,6 @@ class ArrayBroadcastChannelTest : TestBase() {
channel.cancel()
assertTrue(channel.isClosedForSend)
assertTrue(sub.isClosedForReceive)
- check(sub.receiveOrNull() == null)
+ check(sub.receiveCatching().getOrNull() == null)
}
}
diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
index a57b519f..632fd292 100644
--- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -38,17 +38,17 @@ class ArrayChannelTest : TestBase() {
}
@Test
- fun testClosedBufferedReceiveOrNull() = runTest {
+ fun testClosedBufferedReceiveCatching() = runTest {
val q = Channel<Int>(1)
check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
launch {
expect(5)
check(!q.isEmpty && q.isClosedForSend && !q.isClosedForReceive)
- assertEquals(42, q.receiveOrNull())
+ assertEquals(42, q.receiveCatching().getOrNull())
expect(6)
check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive)
- assertNull(q.receiveOrNull())
+ assertNull(q.receiveCatching().getOrNull())
expect(7)
}
expect(2)
@@ -86,31 +86,31 @@ class ArrayChannelTest : TestBase() {
}
@Test
- fun testOfferAndPoll() = runTest {
+ fun testTryOp() = runTest {
val q = Channel<Int>(1)
- assertTrue(q.offer(1))
+ assertTrue(q.trySend(1).isSuccess)
expect(1)
launch {
expect(3)
- assertEquals(1, q.poll())
+ assertEquals(1, q.tryReceive().getOrNull())
expect(4)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
expect(5)
assertEquals(2, q.receive()) // suspends
expect(9)
- assertEquals(3, q.poll())
+ assertEquals(3, q.tryReceive().getOrNull())
expect(10)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
expect(11)
}
expect(2)
yield()
expect(6)
- assertTrue(q.offer(2))
+ assertTrue(q.trySend(2).isSuccess)
expect(7)
- assertTrue(q.offer(3))
+ assertTrue(q.trySend(3).isSuccess)
expect(8)
- assertFalse(q.offer(4))
+ assertFalse(q.trySend(4).isSuccess)
yield()
finish(12)
}
@@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
- assertFailsWith<CancellationException> { q.receiveOrNull() }
+ assertFailsWith<CancellationException> { q.receiveCatching().getOrThrow() }
finish(12)
}
@@ -142,7 +142,7 @@ class ArrayChannelTest : TestBase() {
fun testCancelWithCause() = runTest({ it is TestCancellationException }) {
val channel = Channel<Int>(5)
channel.cancel(TestCancellationException())
- channel.receiveOrNull()
+ channel.receive()
}
@Test
@@ -157,10 +157,10 @@ class ArrayChannelTest : TestBase() {
val capacity = 42
val channel = Channel<Int>(capacity)
repeat(4) {
- channel.offer(-1)
+ channel.trySend(-1)
}
repeat(4) {
- channel.receiveOrNull()
+ channel.receiveCatching().getOrNull()
}
checkBufferChannel(channel, capacity)
}
diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
index 91d941b3..4538f6c6 100644
--- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -15,28 +15,23 @@ class BasicOperationsTest : TestBase() {
}
@Test
- fun testOfferAfterClose() = runTest {
- TestChannelKind.values().forEach { kind -> testOffer(kind) }
+ fun testTrySendToFullChannel() = runTest {
+ TestChannelKind.values().forEach { kind -> testTrySendToFullChannel(kind) }
}
@Test
- fun testSendAfterClose() = runTest {
- TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) }
- }
-
- @Test
- fun testReceiveOrNullAfterClose() = runTest {
- TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) }
+ fun testTrySendAfterClose() = runTest {
+ TestChannelKind.values().forEach { kind -> testTrySend(kind) }
}
@Test
- fun testReceiveOrNullAfterCloseWithException() = runTest {
- TestChannelKind.values().forEach { kind -> testReceiveOrNullException(kind) }
+ fun testSendAfterClose() = runTest {
+ TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) }
}
@Test
- fun testReceiveOrClosed() = runTest {
- TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) }
+ fun testReceiveCatching() = runTest {
+ TestChannelKind.values().forEach { kind -> testReceiveCatching(kind) }
}
@Test
@@ -49,7 +44,7 @@ class BasicOperationsTest : TestBase() {
}
}
expect(1)
- channel.offer(42)
+ channel.trySend(42)
expect(2)
channel.close(AssertionError())
finish(4)
@@ -90,47 +85,8 @@ class BasicOperationsTest : TestBase() {
}
}
- private suspend fun testReceiveOrNull(kind: TestChannelKind) = coroutineScope {
- val channel = kind.create<Int>()
- val d = async(NonCancellable) {
- channel.receive()
- }
-
- yield()
- channel.close()
- assertTrue(channel.isClosedForReceive)
-
- assertNull(channel.receiveOrNull())
- assertNull(channel.poll())
-
- d.join()
- assertTrue(d.getCancellationException().cause is ClosedReceiveChannelException)
- }
-
- private suspend fun testReceiveOrNullException(kind: TestChannelKind) = coroutineScope {
- val channel = kind.create<Int>()
- val d = async(NonCancellable) {
- channel.receive()
- }
-
- yield()
- channel.close(TestException())
- assertTrue(channel.isClosedForReceive)
-
- assertFailsWith<TestException> { channel.poll() }
- try {
- channel.receiveOrNull()
- fail()
- } catch (e: TestException) {
- // Expected
- }
-
- d.join()
- assertTrue(d.getCancellationException().cause is TestException)
- }
-
@Suppress("ReplaceAssertBooleanWithAssertEquality")
- private suspend fun testReceiveOrClosed(kind: TestChannelKind) = coroutineScope {
+ private suspend fun testReceiveCatching(kind: TestChannelKind) = coroutineScope {
reset()
val channel = kind.create<Int>()
launch {
@@ -139,44 +95,58 @@ class BasicOperationsTest : TestBase() {
}
expect(1)
- val result = channel.receiveOrClosed()
- assertEquals(1, result.value)
- assertEquals(1, result.valueOrNull)
- assertTrue(ValueOrClosed.value(1) == result)
+ val result = channel.receiveCatching()
+ assertEquals(1, result.getOrThrow())
+ assertEquals(1, result.getOrNull())
+ assertTrue(ChannelResult.success(1) == result)
expect(3)
launch {
expect(4)
channel.close()
}
- val closed = channel.receiveOrClosed()
+ val closed = channel.receiveCatching()
expect(5)
- assertNull(closed.valueOrNull)
+ assertNull(closed.getOrNull())
assertTrue(closed.isClosed)
- assertNull(closed.closeCause)
- assertTrue(ValueOrClosed.closed<Int>(closed.closeCause) == closed)
+ assertNull(closed.exceptionOrNull())
+ assertTrue(ChannelResult.closed<Int>(closed.exceptionOrNull()) == closed)
finish(6)
}
- private suspend fun testOffer(kind: TestChannelKind) = coroutineScope {
+ private suspend fun testTrySend(kind: TestChannelKind) = coroutineScope {
val channel = kind.create<Int>()
val d = async { channel.send(42) }
yield()
channel.close()
assertTrue(channel.isClosedForSend)
- try {
- channel.offer(2)
- fail()
- } catch (e: ClosedSendChannelException) {
- if (!kind.isConflated) {
- assertEquals(42, channel.receive())
+ channel.trySend(2)
+ .onSuccess { expectUnreached() }
+ .onClosed {
+ assertTrue { it is ClosedSendChannelException}
+ if (!kind.isConflated) {
+ assertEquals(42, channel.receive())
+ }
}
- }
-
d.await()
}
+ private suspend fun testTrySendToFullChannel(kind: TestChannelKind) = coroutineScope {
+ if (kind.isConflated || kind.capacity == Int.MAX_VALUE) return@coroutineScope
+ val channel = kind.create<Int>()
+ // Make it full
+ repeat(11) {
+ channel.trySend(42)
+ }
+ channel.trySend(1)
+ .onSuccess { expectUnreached() }
+ .onFailure { assertNull(it) }
+ .onClosed {
+ expectUnreached()
+ }
+ }
+
/**
* [ClosedSendChannelException] should not be eaten.
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt
index 41f60479..0b9a0fdb 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelBufferOverflowTest.kt
@@ -11,30 +11,30 @@ class ChannelBufferOverflowTest : TestBase() {
@Test
fun testDropLatest() = runTest {
val c = Channel<Int>(2, BufferOverflow.DROP_LATEST)
- assertTrue(c.offer(1))
- assertTrue(c.offer(2))
- assertTrue(c.offer(3)) // overflows, dropped
+ assertTrue(c.trySend(1).isSuccess)
+ assertTrue(c.trySend(2).isSuccess)
+ assertTrue(c.trySend(3).isSuccess) // overflows, dropped
c.send(4) // overflows dropped
assertEquals(1, c.receive())
- assertTrue(c.offer(5))
- assertTrue(c.offer(6)) // overflows, dropped
+ assertTrue(c.trySend(5).isSuccess)
+ assertTrue(c.trySend(6).isSuccess) // overflows, dropped
assertEquals(2, c.receive())
assertEquals(5, c.receive())
- assertEquals(null, c.poll())
+ assertEquals(null, c.tryReceive().getOrNull())
}
@Test
fun testDropOldest() = runTest {
val c = Channel<Int>(2, BufferOverflow.DROP_OLDEST)
- assertTrue(c.offer(1))
- assertTrue(c.offer(2))
- assertTrue(c.offer(3)) // overflows, keeps 2, 3
+ assertTrue(c.trySend(1).isSuccess)
+ assertTrue(c.trySend(2).isSuccess)
+ assertTrue(c.trySend(3).isSuccess) // overflows, keeps 2, 3
c.send(4) // overflows, keeps 3, 4
assertEquals(3, c.receive())
- assertTrue(c.offer(5))
- assertTrue(c.offer(6)) // overflows, keeps 5, 6
+ assertTrue(c.trySend(5).isSuccess)
+ assertTrue(c.trySend(6).isSuccess) // overflows, keeps 5, 6
assertEquals(5, c.receive())
assertEquals(6, c.receive())
- assertEquals(null, c.poll())
+ assertEquals(null, c.tryReceive().getOrNull())
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt
new file mode 100644
index 00000000..2341c62e
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.channels
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class ChannelReceiveCatchingTest : TestBase() {
+ @Test
+ fun testChannelOfThrowables() = runTest {
+ val channel = Channel<Throwable>()
+ launch {
+ channel.send(TestException1())
+ channel.close(TestException2())
+ }
+
+ val element = channel.receiveCatching()
+ assertTrue(element.getOrThrow() is TestException1)
+ assertTrue(element.getOrNull() is TestException1)
+
+ val closed = channel.receiveCatching()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.isFailure)
+ assertTrue(closed.exceptionOrNull() is TestException2)
+ }
+
+ @Test
+ @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test
+ fun testNullableIntChanel() = runTest {
+ val channel = Channel<Int?>()
+ launch {
+ expect(2)
+ channel.send(1)
+ expect(3)
+ channel.send(null)
+
+ expect(6)
+ channel.close()
+ }
+
+ expect(1)
+ val element = channel.receiveCatching()
+ assertEquals(1, element.getOrThrow())
+ assertEquals(1, element.getOrNull())
+ assertEquals("Value(1)", element.toString())
+ assertTrue(ChannelResult.success(1) == element) // Don't box
+ assertFalse(element.isFailure)
+ assertFalse(element.isClosed)
+
+ expect(4)
+ val nullElement = channel.receiveCatching()
+ assertNull(nullElement.getOrThrow())
+ assertNull(nullElement.getOrNull())
+ assertEquals("Value(null)", nullElement.toString())
+ assertTrue(ChannelResult.success(null) == nullElement) // Don't box
+ assertFalse(element.isFailure)
+ assertFalse(element.isClosed)
+
+ expect(5)
+ val closed = channel.receiveCatching()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.isFailure)
+
+ val closed2 = channel.receiveCatching()
+ assertTrue(closed2.isClosed)
+ assertTrue(closed.isFailure)
+ assertNull(closed2.exceptionOrNull())
+ finish(7)
+ }
+
+ @Test
+ @ExperimentalUnsignedTypes
+ fun testUIntChannel() = runTest {
+ val channel = Channel<UInt>()
+ launch {
+ expect(2)
+ channel.send(1u)
+ yield()
+ expect(4)
+ channel.send((Long.MAX_VALUE - 1).toUInt())
+ expect(5)
+ }
+
+ expect(1)
+ val element = channel.receiveCatching()
+ assertEquals(1u, element.getOrThrow())
+
+ expect(3)
+ val element2 = channel.receiveCatching()
+ assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.getOrThrow())
+ finish(6)
+ }
+
+ @Test
+ fun testCancelChannel() = runTest {
+ val channel = Channel<Boolean>()
+ launch {
+ expect(2)
+ channel.cancel()
+ }
+
+ expect(1)
+ val closed = channel.receiveCatching()
+ assertTrue(closed.isClosed)
+ assertTrue(closed.isFailure)
+ finish(3)
+ }
+
+ @Test
+ @ExperimentalUnsignedTypes
+ fun testReceiveResultChannel() = runTest {
+ val channel = Channel<ChannelResult<UInt>>()
+ launch {
+ channel.send(ChannelResult.success(1u))
+ channel.send(ChannelResult.closed(TestException1()))
+ channel.close(TestException2())
+ }
+
+ val intResult = channel.receiveCatching()
+ assertEquals(1u, intResult.getOrThrow().getOrThrow())
+ assertFalse(intResult.isFailure)
+ assertFalse(intResult.isClosed)
+
+ val closeCauseResult = channel.receiveCatching()
+ assertTrue(closeCauseResult.getOrThrow().exceptionOrNull() is TestException1)
+
+ val closeCause = channel.receiveCatching()
+ assertTrue(closeCause.isClosed)
+ assertTrue(closeCause.isFailure)
+ assertTrue(closeCause.exceptionOrNull() is TestException2)
+ }
+
+ @Test
+ fun testToString() = runTest {
+ val channel = Channel<String>(1)
+ channel.send("message")
+ channel.close(TestException1("OK"))
+ assertEquals("Value(message)", channel.receiveCatching().toString())
+ // toString implementation for exception differs on every platform
+ val str = channel.receiveCatching().toString()
+ if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex()))
+ error("Unexpected string: '$str'")
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt
deleted file mode 100644
index e58b0dee..00000000
--- a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.channels
-
-import kotlinx.coroutines.*
-import kotlin.test.*
-
-class ChannelReceiveOrClosedTest : TestBase() {
- @Test
- fun testChannelOfThrowables() = runTest {
- val channel = Channel<Throwable>()
- launch {
- channel.send(TestException1())
- channel.close(TestException2())
- }
-
- val element = channel.receiveOrClosed()
- assertTrue(element.value is TestException1)
- assertTrue(element.valueOrNull is TestException1)
-
- val closed = channel.receiveOrClosed()
- assertTrue(closed.isClosed)
- assertTrue(closed.closeCause is TestException2)
- }
-
- @Test
- @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test
- fun testNullableIntChanel() = runTest {
- val channel = Channel<Int?>()
- launch {
- expect(2)
- channel.send(1)
- expect(3)
- channel.send(null)
-
- expect(6)
- channel.close()
- }
-
- expect(1)
- val element = channel.receiveOrClosed()
- assertEquals(1, element.value)
- assertEquals(1, element.valueOrNull)
- assertEquals("Value(1)", element.toString())
- assertTrue(ValueOrClosed.value(1) == element) // Don't box
-
- expect(4)
- val nullElement = channel.receiveOrClosed()
- assertNull(nullElement.value)
- assertNull(nullElement.valueOrNull)
- assertEquals("Value(null)", nullElement.toString())
- assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box
-
- expect(5)
- val closed = channel.receiveOrClosed()
- assertTrue(closed.isClosed)
-
- val closed2 = channel.receiveOrClosed()
- assertTrue(closed2.isClosed)
- assertNull(closed2.closeCause)
- finish(7)
- }
-
- @Test
- @ExperimentalUnsignedTypes
- fun testUIntChannel() = runTest {
- val channel = Channel<UInt>()
- launch {
- expect(2)
- channel.send(1u)
- yield()
- expect(4)
- channel.send((Long.MAX_VALUE - 1).toUInt())
- expect(5)
- }
-
- expect(1)
- val element = channel.receiveOrClosed()
- assertEquals(1u, element.value)
-
- expect(3)
- val element2 = channel.receiveOrClosed()
- assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value)
- finish(6)
- }
-
- @Test
- fun testCancelChannel() = runTest {
- val channel = Channel<Boolean>()
- launch {
- expect(2)
- channel.cancel()
- }
-
- expect(1)
- val closed = channel.receiveOrClosed()
- assertTrue(closed.isClosed)
- finish(3)
- }
-
- @Test
- @ExperimentalUnsignedTypes
- fun testReceiveResultChannel() = runTest {
- val channel = Channel<ValueOrClosed<UInt>>()
- launch {
- channel.send(ValueOrClosed.value(1u))
- channel.send(ValueOrClosed.closed(TestException1()))
- channel.close(TestException2())
- }
-
- val intResult = channel.receiveOrClosed()
- assertEquals(1u, intResult.value.value)
-
- val closeCauseResult = channel.receiveOrClosed()
- assertTrue(closeCauseResult.value.closeCause is TestException1)
-
- val closeCause = channel.receiveOrClosed()
- assertTrue(closeCause.isClosed)
- assertTrue(closeCause.closeCause is TestException2)
- }
-
- @Test
- fun testToString() = runTest {
- val channel = Channel<String>(1)
- channel.send("message")
- channel.close(TestException1("OK"))
- assertEquals("Value(message)", channel.receiveOrClosed().toString())
- // toString implementation for exception differs on every platform
- val str = channel.receiveOrClosed().toString()
- if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex()))
- error("Unexpected string: '$str'")
- }
-}
diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt
index d2ef3d26..ae05fb8d 100644
--- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -70,36 +70,10 @@ class ChannelUndeliveredElementFailureTest : TestBase() {
}
@Test
- fun testReceiveOrNullCancelledFail() = runTest(unhandled = shouldBeUnhandled) {
+ fun testReceiveCatchingCancelledFail() = runTest(unhandled = shouldBeUnhandled) {
val channel = Channel(onUndeliveredElement = onCancelFail)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
- channel.receiveOrNull()
- expectUnreached() // will be cancelled before it dispatches
- }
- channel.send(item)
- job.cancel()
- }
-
- @Test
- fun testReceiveOrNullSelectCancelledFail() = runTest(unhandled = shouldBeUnhandled) {
- val channel = Channel(onUndeliveredElement = onCancelFail)
- val job = launch(start = CoroutineStart.UNDISPATCHED) {
- select<Unit> {
- channel.onReceiveOrNull {
- expectUnreached()
- }
- }
- expectUnreached() // will be cancelled before it dispatches
- }
- channel.send(item)
- job.cancel()
- }
-
- @Test
- fun testReceiveOrClosedCancelledFail() = runTest(unhandled = shouldBeUnhandled) {
- val channel = Channel(onUndeliveredElement = onCancelFail)
- val job = launch(start = CoroutineStart.UNDISPATCHED) {
- channel.receiveOrClosed()
+ channel.receiveCatching()
expectUnreached() // will be cancelled before it dispatches
}
channel.send(item)
@@ -111,7 +85,7 @@ class ChannelUndeliveredElementFailureTest : TestBase() {
val channel = Channel(onUndeliveredElement = onCancelFail)
val job = launch(start = CoroutineStart.UNDISPATCHED) {
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expectUnreached()
}
}
@@ -140,4 +114,4 @@ class ChannelUndeliveredElementFailureTest : TestBase() {
expectUnreached()
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt
index 7dd232f2..a8c2a29c 100644
--- a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -42,7 +42,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
val sub = broadcast.openSubscription()
- assertNull(sub.poll())
+ assertNull(sub.tryReceive().getOrNull())
expect(3)
assertEquals("one", sub.receive()) // suspends
expect(6)
@@ -68,7 +68,7 @@ class ConflatedBroadcastChannelTest : TestBase() {
expect(14)
assertEquals("three", sub.receive()) // suspends
expect(17)
- assertNull(sub.receiveOrNull()) // suspends until closed
+ assertNull(sub.receiveCatching().getOrNull()) // suspends until closed
expect(20)
sub.cancel()
expect(21)
diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt
index 18f28438..370fd5b9 100644
--- a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -12,14 +12,14 @@ open class ConflatedChannelTest : TestBase() {
Channel<T>(Channel.CONFLATED)
@Test
- fun testBasicConflationOfferPoll() {
+ fun testBasicConflationOfferTryReceive() {
val q = createConflatedChannel<Int>()
- assertNull(q.poll())
- assertTrue(q.offer(1))
- assertTrue(q.offer(2))
- assertTrue(q.offer(3))
- assertEquals(3, q.poll())
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
+ assertTrue(q.trySend(1).isSuccess)
+ assertTrue(q.trySend(2).isSuccess)
+ assertTrue(q.trySend(3).isSuccess)
+ assertEquals(3, q.tryReceive().getOrNull())
+ assertNull(q.tryReceive().getOrNull())
}
@Test
@@ -27,7 +27,7 @@ open class ConflatedChannelTest : TestBase() {
val q = createConflatedChannel<Int>()
q.send(1)
q.send(2) // shall conflated previously sent
- assertEquals(2, q.receiveOrNull())
+ assertEquals(2, q.receiveCatching().getOrNull())
}
@Test
@@ -41,7 +41,7 @@ open class ConflatedChannelTest : TestBase() {
// not it is closed for receive, too
assertTrue(q.isClosedForSend)
assertTrue(q.isClosedForReceive)
- assertNull(q.receiveOrNull())
+ assertNull(q.receiveCatching().getOrNull())
}
@Test
@@ -82,7 +82,7 @@ open class ConflatedChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
- assertFailsWith<CancellationException> { q.receiveOrNull() }
+ assertFailsWith<CancellationException> { q.receiveCatching().getOrThrow() }
finish(2)
}
@@ -90,6 +90,6 @@ open class ConflatedChannelTest : TestBase() {
fun testCancelWithCause() = runTest({ it is TestCancellationException }) {
val channel = createConflatedChannel<Int>()
channel.cancel(TestCancellationException())
- channel.receiveOrNull()
+ channel.receive()
}
}
diff --git a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt
index 4233a350..501affb4 100644
--- a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -12,14 +12,14 @@ class LinkedListChannelTest : TestBase() {
fun testBasic() = runTest {
val c = Channel<Int>(Channel.UNLIMITED)
c.send(1)
- check(c.offer(2))
+ assertTrue(c.trySend(2).isSuccess)
c.send(3)
check(c.close())
check(!c.close())
assertEquals(1, c.receive())
- assertEquals(2, c.poll())
- assertEquals(3, c.receiveOrNull())
- assertNull(c.receiveOrNull())
+ assertEquals(2, c.tryReceive().getOrNull())
+ assertEquals(3, c.receiveCatching().getOrNull())
+ assertNull(c.receiveCatching().getOrNull())
}
@Test
@@ -31,13 +31,13 @@ class LinkedListChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
- assertFailsWith<CancellationException> { q.receiveOrNull() }
+ assertFailsWith<CancellationException> { q.receive() }
}
@Test
fun testCancelWithCause() = runTest({ it is TestCancellationException }) {
val channel = Channel<Int>(Channel.UNLIMITED)
channel.cancel(TestCancellationException())
- channel.receiveOrNull()
+ channel.receive()
}
}
diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
index 194504e7..61ef0726 100644
--- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
@@ -24,7 +24,7 @@ class ProduceTest : TestBase() {
expect(4)
check(c.receive() == 2)
expect(5)
- check(c.receiveOrNull() == null)
+ assertNull(c.receiveCatching().getOrNull())
finish(7)
}
@@ -49,7 +49,7 @@ class ProduceTest : TestBase() {
expect(4)
c.cancel()
expect(5)
- assertFailsWith<CancellationException> { c.receiveOrNull() }
+ assertFailsWith<CancellationException> { c.receiveCatching().getOrThrow() }
expect(6)
yield() // to produce
finish(8)
@@ -76,7 +76,7 @@ class ProduceTest : TestBase() {
expect(4)
c.cancel(TestCancellationException())
try {
- assertNull(c.receiveOrNull())
+ c.receive()
expectUnreached()
} catch (e: TestCancellationException) {
expect(5)
diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
index 4d20d715..c83813e4 100644
--- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -36,15 +36,15 @@ class RendezvousChannelTest : TestBase() {
}
@Test
- fun testClosedReceiveOrNull() = runTest {
+ fun testClosedReceiveCatching() = runTest {
val q = Channel<Int>(Channel.RENDEZVOUS)
check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive)
expect(1)
launch {
expect(3)
- assertEquals(42, q.receiveOrNull())
+ assertEquals(42, q.receiveCatching().getOrNull())
expect(4)
- assertNull(q.receiveOrNull())
+ assertNull(q.receiveCatching().getOrNull())
expect(6)
}
expect(2)
@@ -80,26 +80,26 @@ class RendezvousChannelTest : TestBase() {
}
@Test
- fun testOfferAndPool() = runTest {
+ fun testTrySendTryReceive() = runTest {
val q = Channel<Int>(Channel.RENDEZVOUS)
- assertFalse(q.offer(1))
+ assertFalse(q.trySend(1).isSuccess)
expect(1)
launch {
expect(3)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
expect(4)
assertEquals(2, q.receive())
expect(7)
- assertNull(q.poll())
+ assertNull(q.tryReceive().getOrNull())
yield()
expect(9)
- assertEquals(3, q.poll())
+ assertEquals(3, q.tryReceive().getOrNull())
expect(10)
}
expect(2)
yield()
expect(5)
- assertTrue(q.offer(2))
+ assertTrue(q.trySend(2).isSuccess)
expect(6)
yield()
expect(8)
@@ -233,9 +233,9 @@ class RendezvousChannelTest : TestBase() {
expect(7)
yield() // try to resume sender (it will not resume despite the close!)
expect(8)
- assertEquals(42, q.receiveOrNull())
+ assertEquals(42, q.receiveCatching().getOrNull())
expect(9)
- assertNull(q.receiveOrNull())
+ assertNull(q.receiveCatching().getOrNull())
expect(10)
yield() // to sender, it was resumed!
finish(12)
@@ -266,7 +266,7 @@ class RendezvousChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
- assertFailsWith<CancellationException> { q.receiveOrNull() }
+ assertFailsWith<CancellationException> { q.receiveCatching().getOrThrow() }
finish(12)
}
@@ -274,6 +274,6 @@ class RendezvousChannelTest : TestBase() {
fun testCancelWithCause() = runTest({ it is TestCancellationException }) {
val channel = Channel<Int>(Channel.RENDEZVOUS)
channel.cancel(TestCancellationException())
- channel.receiveOrNull()
+ channel.receiveCatching().getOrThrow()
}
}
diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
index 993be78e..f234e141 100644
--- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
+++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
@@ -42,11 +42,10 @@ private class ChannelViaBroadcast<E>(
override val isEmpty: Boolean get() = sub.isEmpty
override suspend fun receive(): E = sub.receive()
- override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
- override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
- override fun poll(): E? = sub.poll()
+ override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching()
override fun iterator(): ChannelIterator<E> = sub.iterator()
-
+ override fun tryReceive(): ChannelResult<E> = sub.tryReceive()
+
override fun cancel(cause: CancellationException?) = sub.cancel(cause)
// implementing hidden method anyway, so can cast to an internal class
@@ -55,8 +54,6 @@ private class ChannelViaBroadcast<E>(
override val onReceive: SelectClause1<E>
get() = sub.onReceive
- override val onReceiveOrNull: SelectClause1<E?>
- get() = sub.onReceiveOrNull
- override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
- get() = sub.onReceiveOrClosed
+ override val onReceiveCatching: SelectClause1<ChannelResult<E>>
+ get() = sub.onReceiveCatching
}
diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
index ddb1d88a..b2d957be 100644
--- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
+++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
@@ -26,7 +26,7 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine
?: error("Event loop is missing, virtual time source works only as part of event loop")
if (delayNanos <= 0) continue
if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos")
- val nextTask = heap.minBy { it.deadline } ?: return@launch
+ val nextTask = heap.minByOrNull { it.deadline } ?: return@launch
heap.remove(nextTask)
currentTime = nextTask.deadline
nextTask.run()
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
index f93d0399..410955ce 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt
@@ -128,145 +128,6 @@ class ChannelBuildersFlowTest : TestBase() {
}
@Test
- fun testBroadcastChannelAsFlow() = runTest {
- val channel = broadcast {
- repeat(10) {
- send(it + 1)
- }
- }
-
- val sum = channel.asFlow().sum()
- assertEquals(55, sum)
- }
-
- @Test
- fun testExceptionInBroadcast() = runTest {
- expect(1)
- val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
- repeat(10) {
- send(it + 1)
- }
- throw TestException()
- }
- assertEquals(15, channel.asFlow().take(5).sum())
-
- // Workaround for JS bug
- try {
- channel.asFlow().collect { /* Do nothing */ }
- expectUnreached()
- } catch (e: TestException) {
- finish(2)
- }
- }
-
- @Test
- fun testBroadcastChannelAsFlowLimits() = runTest {
- val channel = BroadcastChannel<Int>(1)
- val flow = channel.asFlow().map { it * it }.drop(1).take(2)
-
- var expected = 0
- launch {
- assertTrue(channel.offer(1)) // Handed to the coroutine
- assertTrue(channel.offer(2)) // Buffered
- assertFalse(channel.offer(3)) // Failed to offer
- channel.send(3)
- yield()
- assertEquals(1, expected)
- assertTrue(channel.offer(4)) // Handed to the coroutine
- assertTrue(channel.offer(5)) // Buffered
- assertFalse(channel.offer(6)) // Failed to offer
- channel.send(6)
- assertEquals(2, expected)
- }
-
- val sum = flow.sum()
- assertEquals(13, sum)
- ++expected
- val sum2 = flow.sum()
- assertEquals(61, sum2)
- ++expected
- }
-
- @Test
- fun flowAsBroadcast() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
- }
-
- val channel = flow.broadcastIn(this)
- assertEquals((0..9).toList(), channel.openSubscription().toList())
- }
-
- @Test
- fun flowAsBroadcastMultipleSubscription() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
- }
-
- val broadcast = flow.broadcastIn(this)
- val channel = broadcast.openSubscription()
- val channel2 = broadcast.openSubscription()
-
- assertEquals(0, channel.receive())
- assertEquals(0, channel2.receive())
- yield()
- assertEquals(1, channel.receive())
- assertEquals(1, channel2.receive())
-
- channel.cancel()
- channel2.cancel()
- yield()
- ensureActive()
- }
-
- @Test
- fun flowAsBroadcastException() = runTest {
- val flow = flow {
- repeat(10) {
- emit(it)
- }
-
- throw TestException()
- }
-
- val channel = flow.broadcastIn(this + NonCancellable)
- assertFailsWith<TestException> { channel.openSubscription().toList() }
- assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
- }
-
- // Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
- @Test
- fun testFlowAsBroadcastAsFlow() = runTest {
- val flow = flow {
- emit(1)
- emit(2)
- emit(3)
- }.broadcastIn(this).asFlow()
-
- assertEquals(6, flow.sum())
- assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
- }
-
- @Test
- fun testBroadcastAsFlowAsBroadcast() = runTest {
- val channel = broadcast {
- send(1)
- }.asFlow().broadcastIn(this)
-
- channel.openSubscription().consumeEach {
- assertEquals(1, it)
- }
-
- channel.openSubscription().consumeEach {
- fail()
- }
- }
-
- @Test
fun testProduceInAtomicity() = runTest {
val flow = flowOf(1).onCompletion { expect(2) }
val scope = CoroutineScope(wrapperDispatcher())
diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
index 31a929b2..f197a214 100644
--- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt
@@ -12,9 +12,9 @@ class ChannelFlowTest : TestBase() {
@Test
fun testRegular() = runTest {
val flow = channelFlow {
- assertTrue(offer(1))
- assertTrue(offer(2))
- assertTrue(offer(3))
+ assertTrue(trySend(1).isSuccess)
+ assertTrue(trySend(2).isSuccess)
+ assertTrue(trySend(3).isSuccess)
}
assertEquals(listOf(1, 2, 3), flow.toList())
}
@@ -22,9 +22,9 @@ class ChannelFlowTest : TestBase() {
@Test
fun testBuffer() = runTest {
val flow = channelFlow {
- assertTrue(offer(1))
- assertTrue(offer(2))
- assertFalse(offer(3))
+ assertTrue(trySend(1).isSuccess)
+ assertTrue(trySend(2).isSuccess)
+ assertFalse(trySend(3).isSuccess)
}.buffer(1)
assertEquals(listOf(1, 2), flow.toList())
}
@@ -32,10 +32,10 @@ class ChannelFlowTest : TestBase() {
@Test
fun testConflated() = runTest {
val flow = channelFlow {
- assertTrue(offer(1))
- assertTrue(offer(2))
- assertTrue(offer(3))
- assertTrue(offer(4))
+ assertTrue(trySend(1).isSuccess)
+ assertTrue(trySend(2).isSuccess)
+ assertTrue(trySend(3).isSuccess)
+ assertTrue(trySend(4).isSuccess)
}.buffer(Channel.CONFLATED)
assertEquals(listOf(1, 4), flow.toList()) // two elements in the middle got conflated
}
@@ -43,7 +43,7 @@ class ChannelFlowTest : TestBase() {
@Test
fun testFailureCancelsChannel() = runTest {
val flow = channelFlow {
- offer(1)
+ trySend(1)
invokeOnClose {
expect(2)
}
diff --git a/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt
index d41ab889..e8666479 100644
--- a/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/internal/FlowScopeTest.kt
@@ -68,10 +68,10 @@ class FlowScopeTest : TestBase() {
flowScope {
flowScope {
launch {
- throw CancellationException(null)
+ throw CancellationException("")
}
}
}
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
index f55e8bee..0ff2e0b8 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt
@@ -1,10 +1,11 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlin.test.*
@@ -290,4 +291,24 @@ class OnCompletionTest : TestBase() {
val expected = (1..5).toList() + (-1)
assertEquals(expected, result)
}
+
+ @Test
+ fun testCancelledEmitAllFlow() = runTest {
+ // emitAll does not call 'collect' on onCompletion collector
+ // if the target flow is empty
+ flowOf(1, 2, 3)
+ .onCompletion { emitAll(MutableSharedFlow()) }
+ .take(1)
+ .collect()
+ }
+
+ @Test
+ fun testCancelledEmitAllChannel() = runTest {
+ // emitAll does not call 'collect' on onCompletion collector
+ // if the target channel is empty
+ flowOf(1, 2, 3)
+ .onCompletion { emitAll(Channel()) }
+ .take(1)
+ .collect()
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
index 20e07873..c6be36d0 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/ScanTest.kt
@@ -24,6 +24,13 @@ class ScanTest : TestBase() {
}
@Test
+ fun testFoldWithInitial() = runTest {
+ val flow = flowOf(1, 2, 3)
+ val result = flow.runningFold(emptyList<Int>()) { acc, value -> acc + value }.toList()
+ assertEquals(listOf(emptyList(), listOf(1), listOf(1, 2), listOf(1, 2, 3)), result)
+ }
+
+ @Test
fun testNulls() = runTest {
val flow = flowOf(null, 2, null, null, null, 5)
val result = flow.runningReduce { acc, v -> if (v == null) acc else (if (acc == null) v else acc + v) }.toList()
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt
index 371d0147..85a17ba0 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInFusionTest.kt
@@ -42,7 +42,7 @@ class ShareInFusionTest : TestBase() {
val flow = channelFlow {
// send a batch of 10 elements using [offer]
for (i in 1..10) {
- assertTrue(offer(i)) // offer must succeed, because buffer
+ assertTrue(trySend(i).isSuccess) // offer must succeed, because buffer
}
send(0) // done
}.buffer(10) // request a buffer of 10
@@ -53,4 +53,4 @@ class ShareInFusionTest : TestBase() {
.collect { i -> expect(i + 1) }
finish(12)
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt
index 42cdb1e1..db69e2bc 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt
@@ -167,11 +167,11 @@ class ShareInTest : TestBase() {
subs += shared
.onEach { value -> // only the first threshold subscribers get the value
when (i) {
- in 1..threshold -> log.offer("sub$i: $value")
+ in 1..threshold -> log.trySend("sub$i: $value")
else -> expectUnreached()
}
}
- .onCompletion { log.offer("sub$i: completion") }
+ .onCompletion { log.trySend("sub$i: completion") }
.launchIn(this)
checkStartTransition(i)
}
@@ -210,4 +210,4 @@ class ShareInTest : TestBase() {
stop()
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt
index 0a2c0458..be4f8c53 100644
--- a/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt
@@ -174,23 +174,11 @@ class StateFlowTest : TestBase() {
}
@Test
- fun testReferenceUpdatesAndCAS() {
- val d0 = Data(0)
- val d0_1 = Data(0)
- val d1 = Data(1)
- val d1_1 = Data(1)
- val d1_2 = Data(1)
- val state = MutableStateFlow(d0)
- assertSame(d0, state.value)
- state.value = d0_1 // equal, nothing changes
- assertSame(d0, state.value)
- state.value = d1 // updates
- assertSame(d1, state.value)
- assertFalse(state.compareAndSet(d0, d0)) // wrong value
- assertSame(d1, state.value)
- assertTrue(state.compareAndSet(d1_1, d1_2)) // "updates", but ref stays
- assertSame(d1, state.value)
- assertTrue(state.compareAndSet(d1_1, d0)) // updates, reference changes
- assertSame(d0, state.value)
+ fun testUpdate() = runTest {
+ val state = MutableStateFlow(0)
+ state.update { it + 2 }
+ assertEquals(2, state.value)
+ state.update { it + 3 }
+ assertEquals(5, state.value)
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/LastTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/LastTest.kt
new file mode 100644
index 00000000..e7699ccc
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/terminal/LastTest.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class LastTest : TestBase() {
+ @Test
+ fun testLast() = runTest {
+ val flow = flowOf(1, 2, 3)
+ assertEquals(3, flow.last())
+ assertEquals(3, flow.lastOrNull())
+ }
+
+ @Test
+ fun testNulls() = runTest {
+ val flow = flowOf(1, null)
+ assertNull(flow.last())
+ assertNull(flow.lastOrNull())
+ }
+
+ @Test
+ fun testNullsLastOrNull() = runTest {
+ val flow = flowOf(null, 1)
+ assertEquals(1, flow.lastOrNull())
+ }
+
+ @Test
+ fun testEmptyFlow() = runTest {
+ assertFailsWith<NoSuchElementException> { emptyFlow<Int>().last() }
+ assertNull(emptyFlow<Int>().lastOrNull())
+ }
+
+ @Test
+ fun testBadClass() = runTest {
+ val instance = BadClass()
+ val flow = flowOf(instance)
+ assertSame(instance, flow.last())
+ assertSame(instance, flow.lastOrNull())
+
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
index a4f8c3ba..0158c843 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.selects
@@ -295,10 +295,10 @@ class SelectArrayChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
}
}
@@ -316,10 +316,10 @@ class SelectArrayChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertTrue(it.closeCause is TestException)
+ assertTrue(it.exceptionOrNull() is TestException)
}
}
@@ -327,16 +327,16 @@ class SelectArrayChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosed() = runTest {
+ fun testSelectReceiveCatching() = runTest {
val c = Channel<Int>(1)
val iterations = 10
expect(1)
val job = launch {
repeat(iterations) {
select<Unit> {
- c.onReceiveOrClosed { v ->
+ c.onReceiveCatching { v ->
expect(4 + it * 2)
- assertEquals(it, v.value)
+ assertEquals(it, v.getOrNull())
}
}
}
@@ -360,9 +360,9 @@ class SelectArrayChannelTest : TestBase() {
launch {
expect(3)
val res = select<String> {
- c.onReceiveOrClosed { v ->
+ c.onReceiveCatching { v ->
expect(6)
- assertEquals(42, v.value)
+ assertEquals(42, v.getOrNull())
yield() // back to main
expect(8)
"OK"
@@ -396,9 +396,9 @@ class SelectArrayChannelTest : TestBase() {
expect(1)
select<Unit> {
expect(2)
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
finish(3)
}
}
@@ -412,9 +412,9 @@ class SelectArrayChannelTest : TestBase() {
expect(1)
select<Unit> {
expect(2)
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
assertFalse(it.isClosed)
- assertEquals(42, it.value)
+ assertEquals(42, it.getOrNull())
finish(3)
}
}
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt
index e31ccfc1..ba8f56ad 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
@@ -27,7 +27,7 @@ class SelectLoopTest : TestBase() {
try {
while (true) {
select<Unit> {
- channel.onReceiveOrNull {
+ channel.onReceiveCatching {
expectUnreached()
}
job.onJoin {
@@ -40,4 +40,4 @@ class SelectLoopTest : TestBase() {
finish(4)
}
}
-} \ No newline at end of file
+}
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
index 2027630f..6a157676 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913
@@ -306,7 +306,7 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedWaitClosed() = runTest {
+ fun testSelectReceiveCatchingWaitClosed() = runTest {
expect(1)
val channel = Channel<String>(Channel.RENDEZVOUS)
launch {
@@ -316,10 +316,10 @@ class SelectRendezvousChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
}
}
@@ -327,7 +327,7 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest {
+ fun testSelectReceiveCatchingWaitClosedWithCause() = runTest {
expect(1)
val channel = Channel<String>(Channel.RENDEZVOUS)
launch {
@@ -337,10 +337,10 @@ class SelectRendezvousChannelTest : TestBase() {
}
expect(2)
select<Unit> {
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
expect(5)
assertTrue(it.isClosed)
- assertTrue(it.closeCause is TestException)
+ assertTrue(it.exceptionOrNull() is TestException)
}
}
@@ -348,31 +348,31 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedForClosedChannel() = runTest {
+ fun testSelectReceiveCatchingForClosedChannel() = runTest {
val channel = Channel<Unit>()
channel.close()
expect(1)
select<Unit> {
expect(2)
- channel.onReceiveOrClosed {
+ channel.onReceiveCatching {
assertTrue(it.isClosed)
- assertNull(it.closeCause)
+ assertNull(it.exceptionOrNull())
finish(3)
}
}
}
@Test
- fun testSelectReceiveOrClosed() = runTest {
+ fun testSelectReceiveCatching() = runTest {
val channel = Channel<Int>(Channel.RENDEZVOUS)
val iterations = 10
expect(1)
val job = launch {
repeat(iterations) {
select<Unit> {
- channel.onReceiveOrClosed { v ->
+ channel.onReceiveCatching { v ->
expect(4 + it * 2)
- assertEquals(it, v.value)
+ assertEquals(it, v.getOrThrow())
}
}
}
@@ -390,15 +390,15 @@ class SelectRendezvousChannelTest : TestBase() {
}
@Test
- fun testSelectReceiveOrClosedDispatch() = runTest {
+ fun testSelectReceiveCatchingDispatch() = runTest {
val c = Channel<Int>(Channel.RENDEZVOUS)
expect(1)
launch {
expect(3)
val res = select<String> {
- c.onReceiveOrClosed { v ->
+ c.onReceiveCatching { v ->
expect(6)
- assertEquals(42, v.value)
+ assertEquals(42, v.getOrThrow())
yield() // back to main
expect(8)
"OK"