aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/test/SingleTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/SingleTest.kt28
1 files changed, 27 insertions, 1 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
index 46bcaf84..3e763aa5 100644
--- a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
@@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.exceptions.*
import io.reactivex.rxjava3.functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -98,6 +99,31 @@ class SingleTest : TestBase() {
assertEquals("OK", Single.just("O").await() + "K")
}
+ /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testSingleAwaitCancellation() = runTest {
+ expect(1)
+ val single = SingleSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ single.await()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
@Test
fun testSingleEmitAndAwait() {
val single = rxSingle {
@@ -221,7 +247,7 @@ class SingleTest : TestBase() {
fun testFatalExceptionInSingle() = runTest {
rxSingle(Dispatchers.Unconfined) {
throw LinkageError()
- }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) })
+ }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) }
finish(2)
}