diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt | 435 |
1 files changed, 435 insertions, 0 deletions
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt new file mode 100644 index 00000000..6072cc2c --- /dev/null +++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt @@ -0,0 +1,435 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 + +package kotlinx.coroutines.selects + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.intrinsics.* +import kotlin.test.* + +class SelectRendezvousChannelTest : TestBase() { + + @Test + fun testSelectSendSuccess() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(2) + assertEquals("OK", channel.receive()) + finish(6) + } + yield() // to launched coroutine + expect(3) + select<Unit> { + channel.onSend("OK") { + expect(4) + } + } + expect(5) + } + + @Test + fun testSelectSendSuccessWithDefault() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(2) + assertEquals("OK", channel.receive()) + finish(6) + } + yield() // to launched coroutine + expect(3) + select<Unit> { + channel.onSend("OK") { + expect(4) + } + default { + expectUnreached() + } + } + expect(5) + } + + @Test + fun testSelectSendWaitWithDefault() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + select<Unit> { + channel.onSend("OK") { + expectUnreached() + } + default { + expect(2) + } + } + expect(3) + // make sure receive blocks (select above is over) + launch { + expect(5) + assertEquals("CHK", channel.receive()) + finish(8) + } + expect(4) + yield() + expect(6) + channel.send("CHK") + expect(7) + } + + @Test + fun testSelectSendWait() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(3) + assertEquals("OK", channel.receive()) + expect(4) + } + expect(2) + select<Unit> { + channel.onSend("OK") { + expect(5) + } + } + finish(6) + } + + @Test + fun testSelectReceiveSuccess() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(2) + channel.send("OK") + finish(6) + } + yield() // to launched coroutine + expect(3) + select<Unit> { + channel.onReceive { v -> + expect(4) + assertEquals("OK", v) + } + } + expect(5) + } + + @Test + fun testSelectReceiveSuccessWithDefault() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(2) + channel.send("OK") + finish(6) + } + yield() // to launched coroutine + expect(3) + select<Unit> { + channel.onReceive { v -> + expect(4) + assertEquals("OK", v) + } + default { + expectUnreached() + } + } + expect(5) + } + + @Test + fun testSelectReceiveWaitWithDefault() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + select<Unit> { + channel.onReceive { + expectUnreached() + } + default { + expect(2) + } + } + expect(3) + // make sure send blocks (select above is over) + launch { + expect(5) + channel.send("CHK") + finish(8) + } + expect(4) + yield() + expect(6) + assertEquals("CHK", channel.receive()) + expect(7) + } + + @Test + fun testSelectReceiveWait() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(3) + channel.send("OK") + expect(4) + } + expect(2) + select<Unit> { + channel.onReceive { v -> + expect(5) + assertEquals("OK", v) + } + } + finish(6) + } + + @Test + fun testSelectReceiveClosed() = runTest(expected = { it is ClosedReceiveChannelException }) { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + channel.close() + finish(2) + select<Unit> { + channel.onReceive { + expectUnreached() + } + } + expectUnreached() + } + + @Test + fun testSelectReceiveWaitClosed() = runTest(expected = {it is ClosedReceiveChannelException}) { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(3) + channel.close() + finish(4) + } + expect(2) + select<Unit> { + channel.onReceive { + expectUnreached() + } + } + expectUnreached() + } + + @Test + fun testSelectSendResourceCleanup() = runTest { + val channel = Channel<Int>(Channel.RENDEZVOUS) + val n = 1_000 + expect(1) + repeat(n) { i -> + select { + channel.onSend(i) { expectUnreached() } + default { expect(i + 2) } + } + } + finish(n + 2) + } + + @Test + fun testSelectReceiveResourceCleanup() = runTest { + val channel = Channel<Int>(Channel.RENDEZVOUS) + val n = 1_000 + expect(1) + repeat(n) { i -> + select { + channel.onReceive { expectUnreached() } + default { expect(i + 2) } + } + } + finish(n + 2) + } + + @Test + fun testSelectAtomicFailure() = runTest { + val c1 = Channel<Int>(Channel.RENDEZVOUS) + val c2 = Channel<Int>(Channel.RENDEZVOUS) + expect(1) + launch { + expect(3) + val res = select<String> { + c1.onReceive { v1 -> + expect(4) + assertEquals(42, v1) + yield() // back to main + expect(7) + "OK" + } + c2.onReceive { + "FAIL" + } + } + expect(8) + assertEquals("OK", res) + } + expect(2) + c1.send(42) // send to coroutine, suspends + expect(5) + c2.close() // makes sure that selected expression does not fail! + expect(6) + yield() // back + finish(9) + } + + @Test + fun testSelectWaitDispatch() = runTest { + val c = Channel<Int>(Channel.RENDEZVOUS) + expect(1) + launch { + expect(3) + val res = select<String> { + c.onReceive { v -> + expect(6) + assertEquals(42, v) + yield() // back to main + expect(8) + "OK" + } + } + expect(9) + assertEquals("OK", res) + } + expect(2) + yield() // to launch + expect(4) + c.send(42) // do not suspend + expect(5) + yield() // to receive + expect(7) + yield() // again + finish(10) + } + + @Test + fun testSelectReceiveOrClosedWaitClosed() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(3) + channel.close() + expect(4) + } + expect(2) + select<Unit> { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertNull(it.closeCause) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest { + expect(1) + val channel = Channel<String>(Channel.RENDEZVOUS) + launch { + expect(3) + channel.close(TestException()) + expect(4) + } + expect(2) + select<Unit> { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertTrue(it.closeCause is TestException) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosed() = runTest { + val channel = Channel<Int>(Channel.RENDEZVOUS) + val iterations = 10 + expect(1) + val job = launch { + repeat(iterations) { + select<Unit> { + channel.onReceiveOrClosed { v -> + expect(4 + it * 2) + assertEquals(it, v.value) + } + } + } + } + + expect(2) + repeat(iterations) { + expect(3 + it * 2) + channel.send(it) + yield() + } + + job.join() + finish(3 + iterations * 2) + } + + @Test + fun testSelectReceiveOrClosedDispatch() = runTest { + val c = Channel<Int>(Channel.RENDEZVOUS) + expect(1) + launch { + expect(3) + val res = select<String> { + c.onReceiveOrClosed { v -> + expect(6) + assertEquals(42, v.value) + yield() // back to main + expect(8) + "OK" + } + } + expect(9) + assertEquals("OK", res) + } + expect(2) + yield() // to launch + expect(4) + c.send(42) // do not suspend + expect(5) + yield() // to receive + expect(7) + yield() // again + finish(10) + } + + @Test + fun testSelectSendWhenClosed() = runTest { + expect(1) + val c = Channel<Int>(Channel.RENDEZVOUS) + val sender = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + c.send(1) // enqueue sender + expectUnreached() + } + c.close() // then close + assertFailsWith<ClosedSendChannelException> { + // select sender should fail + expect(3) + select { + c.onSend(2) { + expectUnreached() + } + } + } + sender.cancel() + finish(4) + } + + // only for debugging + internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) { + this as SelectBuilderImpl // type assertion + if (!trySelect(null)) return + block.startCoroutineUnintercepted(this) + } +} |