aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx3/test
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/test')
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt30
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt16
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt15
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt98
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt68
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt37
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt30
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt2
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt20
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/SingleTest.kt28
10 files changed, 296 insertions, 48 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
index e5399d16..cfdb6d41 100644
--- a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
@@ -98,6 +98,31 @@ class CompletableTest : TestBase() {
}
}
+ /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is
+ * cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val completable = CompletableSource { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ completable.await()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
@Test
fun testSuppressedException() = runTest {
val completable = rxCompletable(currentDispatcher()) {
@@ -119,7 +144,7 @@ class CompletableTest : TestBase() {
}
@Test
- fun testUnhandledException() = runTest() {
+ fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
val handler = { e: Throwable ->
@@ -165,8 +190,7 @@ class CompletableTest : TestBase() {
withExceptionHandler(handler) {
rxCompletable(Dispatchers.Unconfined) {
expect(1)
- 42
- }.subscribe({ throw LinkageError() })
+ }.subscribe { throw LinkageError() }
finish(3)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
index 8cbd7ee8..126cb818 100644
--- a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
@@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is reported to both onError and CEH
+ expect(2) // Fatal exceptions are not treated as special
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxFlowable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Fatal exception is reported to both onError and CEH
- finish(5)
+ }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled
+ finish(4)
}
@Test
diff --git a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
index 395672ce..1302124f 100644
--- a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
@@ -125,6 +125,21 @@ class IntegrationTest(
finish(3)
}
+ @Test
+ fun testObservableWithTimeout() = runTest {
+ val observable = rxObservable<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
+ }
+ try {
+ expect(1)
+ observable.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
observable.collect {
diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
index e0cec748..bea939ef 100644
--- a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
@@ -7,13 +7,12 @@ package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.exceptions.*
-import io.reactivex.rxjava3.functions.*
import io.reactivex.rxjava3.internal.functions.Functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
-import java.util.concurrent.CancellationException
import kotlin.test.*
class MaybeTest : TestBase() {
@@ -47,7 +46,7 @@ class MaybeTest : TestBase() {
null
}
expect(2)
- maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+ maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
expect(5)
})
expect(3)
@@ -85,7 +84,7 @@ class MaybeTest : TestBase() {
expectUnreached()
}
expect(2)
- // nothing is called on a disposed rx3 maybe
+ // nothing is called on a disposed rx2 maybe
val sub = maybe.subscribe({
expectUnreached()
}, {
@@ -112,18 +111,45 @@ class MaybeTest : TestBase() {
@Test
fun testMaybeAwait() = runBlocking {
- assertEquals("OK", Maybe.just("O").await() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
}
@Test
- fun testMaybeAwaitForNull() = runBlocking {
- assertNull(Maybe.empty<String>().await())
+ fun testMaybeAwaitForNull(): Unit = runBlocking {
+ assertNull(Maybe.empty<String>().awaitSingleOrNull())
+ assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
+ }
+
+ /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testMaybeAwaitCancellation() = runTest {
+ expect(1)
+ val maybe = MaybeSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ maybe.awaitSingleOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
}
@Test
fun testMaybeEmitAndAwait() {
val maybe = rxMaybe {
- Maybe.just("O").await() + "K"
+ Maybe.just("O").awaitSingleOrNull() + "K"
}
checkMaybeValue(maybe) {
@@ -205,7 +231,7 @@ class MaybeTest : TestBase() {
@Test
fun testCancelledConsumer() = runTest {
expect(1)
- val maybe = rxMaybe<Int>(currentDispatcher()) {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
try {
delay(Long.MAX_VALUE)
@@ -228,6 +254,56 @@ class MaybeTest : TestBase() {
finish(7)
}
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectEmpty() = runTest {
+ expect(1)
+ Maybe.empty<Int>().collect {
+ expectUnreached()
+ }
+ finish(2)
+ }
+
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectSingle() = runTest {
+ expect(1)
+ Maybe.just("OK").collect {
+ assertEquals("OK", it)
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the Maybe raises an error. */
+ @Test
+ fun testMaybeCollectThrowingMaybe() = runTest {
+ expect(1)
+ try {
+ Maybe.error<Int>(TestException()).collect {
+ expectUnreached()
+ }
+ } catch (e: TestException) {
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the action throws. */
+ @Test
+ fun testMaybeCollectThrowingAction() = runTest {
+ expect(1)
+ try {
+ Maybe.just("OK").collect {
+ expect(2)
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
@Test
fun testSuppressedException() = runTest {
val maybe = rxMaybe(currentDispatcher()) {
@@ -241,7 +317,7 @@ class MaybeTest : TestBase() {
}
}
try {
- maybe.await()
+ maybe.awaitSingleOrNull()
expectUnreached()
} catch (e: TestException) {
assertTrue(e.suppressed[0] is TestException2)
@@ -301,7 +377,7 @@ class MaybeTest : TestBase() {
rxMaybe(Dispatchers.Unconfined) {
expect(1)
42
- }.subscribe({ throw LinkageError() })
+ }.subscribe { throw LinkageError() }
finish(3)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt
new file mode 100644
index 00000000..680786f9
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.ObservableSource
+import io.reactivex.rxjava3.disposables.*
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableCollectTest: TestBase() {
+
+ /** Tests the behavior of [collect] when the publisher raises an error. */
+ @Test
+ fun testObservableCollectThrowingObservable() = runTest {
+ expect(1)
+ var sum = 0
+ try {
+ rxObservable {
+ for (i in 0..100) {
+ send(i)
+ }
+ throw TestException()
+ }.collect {
+ sum += it
+ }
+ } catch (e: TestException) {
+ assertTrue(sum > 0)
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testObservableCollectThrowingAction() = runTest {
+ expect(1)
+ var sum = 0
+ val expectedSum = 5
+ try {
+ var disposed = false
+ ObservableSource<Int> { observer ->
+ launch(Dispatchers.Default) {
+ observer.onSubscribe(object : Disposable {
+ override fun dispose() {
+ disposed = true
+ expect(expectedSum + 2)
+ }
+
+ override fun isDisposed(): Boolean = disposed
+ })
+ while (!disposed) {
+ observer.onNext(1)
+ }
+ }
+ }.collect {
+ expect(sum + 2)
+ sum += it
+ if (sum == expectedSum) {
+ throw TestException()
+ }
+ }
+ } catch (e: TestException) {
+ assertEquals(expectedSum, sum)
+ finish(expectedSum + 3)
+ }
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
index 1183b2ae..5ddb36ed 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
@@ -8,6 +8,7 @@ import io.reactivex.rxjava3.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
+import java.util.concurrent.*
import kotlin.test.*
class ObservableExceptionHandlingTest : TestBase() {
@@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
- assertTrue(t is UndeliverableException && t.cause is T)
+ assertTrue(t is UndeliverableException && t.cause is T, "$t")
expect(expect)
}
@@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
- rxObservable<Int>(Dispatchers.Unconfined) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw LinkageError()
}.subscribe({
@@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is not reported in onError
+ expect(2) // Fatal exceptions are not treated in a special manner
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+ val latch = CountDownLatch(1)
rxObservable(Dispatchers.Unconfined) {
expect(1)
- send(Unit)
+ val result = trySend(Unit)
+ val exception = result.exceptionOrNull()
+ assertTrue(exception is UndeliverableException)
+ assertTrue(exception.cause is LinkageError)
+ assertTrue(isClosedForSend)
+ expect(4)
+ latch.countDown()
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Unreached because fatal errors are rethrown
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ latch.await()
finish(5)
}
@@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}.subscribe({
expect(2)
throw TestException()
- }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ }, { expect(3) })
finish(4)
}
@@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
@@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) })
- finish(5)
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ finish(4)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
index 2a3ce046..692f0144 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
@@ -5,7 +5,9 @@
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() {
@Test
fun testAwaitFirstOrNull() {
- val observable = rxObservable<String> {
+ val observable = rxObservable {
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() {
}
}
+ /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of
+ * the subscription when their [Job] is cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val observable = ObservableSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ observable.awaitFirst()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
+
@Test
fun testExceptionFromObservable() {
val observable = rxObservable {
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
index 431a7a78..01d6a20e 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
@@ -27,7 +27,7 @@ class ObservableSourceAsFlowStressTest : TestBase() {
val latch = Channel<Unit>(1)
var i = 0
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
- .doOnNext { if (++i > 100) latch.offer(Unit) }
+ .doOnNext { if (++i > 100) latch.trySend(Unit) }
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
latch.receive()
job.cancelAndJoin()
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
index 2f043161..58a54616 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
@@ -1,12 +1,14 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import org.junit.Test
+import kotlin.onSuccess
import kotlin.test.*
class ObservableSubscriptionSelectTest : TestBase() {
@@ -22,23 +24,23 @@ class ObservableSubscriptionSelectTest : TestBase() {
val channelB = source.openSubscription()
loop@ while (true) {
val done: Int = select {
- channelA.onReceiveOrNull {
- if (it != null) assertEquals(a++, it)
- if (it == null) 0 else 1
+ channelA.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(a++, it) }
+ if (result.isSuccess) 1 else 0
}
- channelB.onReceiveOrNull {
- if (it != null) assertEquals(b++, it)
- if (it == null) 0 else 2
+ channelB.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(b++, it) }
+ if (result.isSuccess) 2 else 0
}
}
when (done) {
0 -> break@loop
1 -> {
- val r = channelB.receiveOrNull()
+ val r = channelB.receiveCatching().getOrNull()
if (r != null) assertEquals(b++, r)
}
2 -> {
- val r = channelA.receiveOrNull()
+ val r = channelA.receiveCatching().getOrNull()
if (r != null) assertEquals(a++, r)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
index 46bcaf84..3e763aa5 100644
--- a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
@@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.exceptions.*
import io.reactivex.rxjava3.functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -98,6 +99,31 @@ class SingleTest : TestBase() {
assertEquals("OK", Single.just("O").await() + "K")
}
+ /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testSingleAwaitCancellation() = runTest {
+ expect(1)
+ val single = SingleSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ single.await()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
@Test
fun testSingleEmitAndAwait() {
val single = rxSingle {
@@ -221,7 +247,7 @@ class SingleTest : TestBase() {
fun testFatalExceptionInSingle() = runTest {
rxSingle(Dispatchers.Unconfined) {
throw LinkageError()
- }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) })
+ }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) }
finish(2)
}