diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt | 30 |
1 files changed, 29 insertions, 1 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt index 2a3ce046..692f0144 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt @@ -5,7 +5,9 @@ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val observable = rxObservable<String> { + val observable = rxObservable { send(Observable.empty<String>().awaitFirstOrNull() ?: "OK") } @@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() { } } + /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of + * the subscription when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val observable = ObservableSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + observable.awaitFirst() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + + @Test fun testExceptionFromObservable() { val observable = rxObservable { |