aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt30
1 files changed, 29 insertions, 1 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
index 2a3ce046..692f0144 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
@@ -5,7 +5,9 @@
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() {
@Test
fun testAwaitFirstOrNull() {
- val observable = rxObservable<String> {
+ val observable = rxObservable {
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() {
}
}
+ /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of
+ * the subscription when their [Job] is cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val observable = ObservableSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ observable.awaitFirst()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
+
@Test
fun testExceptionFromObservable() {
val observable = rxObservable {