aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt96
1 files changed, 86 insertions, 10 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
index 08427dcf..f5d128d3 100644
--- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
@@ -7,13 +7,12 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import io.reactivex.disposables.*
import io.reactivex.exceptions.*
-import io.reactivex.functions.*
import io.reactivex.internal.functions.Functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
-import java.util.concurrent.CancellationException
import kotlin.test.*
class MaybeTest : TestBase() {
@@ -47,7 +46,7 @@ class MaybeTest : TestBase() {
null
}
expect(2)
- maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+ maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
expect(5)
})
expect(3)
@@ -112,18 +111,45 @@ class MaybeTest : TestBase() {
@Test
fun testMaybeAwait() = runBlocking {
- assertEquals("OK", Maybe.just("O").await() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
}
@Test
- fun testMaybeAwaitForNull() = runBlocking {
- assertNull(Maybe.empty<String>().await())
+ fun testMaybeAwaitForNull(): Unit = runBlocking {
+ assertNull(Maybe.empty<String>().awaitSingleOrNull())
+ assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
+ }
+
+ /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testMaybeAwaitCancellation() = runTest {
+ expect(1)
+ val maybe = MaybeSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ maybe.awaitSingleOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
}
@Test
fun testMaybeEmitAndAwait() {
val maybe = rxMaybe {
- Maybe.just("O").await() + "K"
+ Maybe.just("O").awaitSingleOrNull() + "K"
}
checkMaybeValue(maybe) {
@@ -205,7 +231,7 @@ class MaybeTest : TestBase() {
@Test
fun testCancelledConsumer() = runTest {
expect(1)
- val maybe = rxMaybe<Int>(currentDispatcher()) {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
try {
delay(Long.MAX_VALUE)
@@ -228,6 +254,56 @@ class MaybeTest : TestBase() {
finish(7)
}
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectEmpty() = runTest {
+ expect(1)
+ Maybe.empty<Int>().collect {
+ expectUnreached()
+ }
+ finish(2)
+ }
+
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectSingle() = runTest {
+ expect(1)
+ Maybe.just("OK").collect {
+ assertEquals("OK", it)
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the Maybe raises an error. */
+ @Test
+ fun testMaybeCollectThrowingMaybe() = runTest {
+ expect(1)
+ try {
+ Maybe.error<Int>(TestException()).collect {
+ expectUnreached()
+ }
+ } catch (e: TestException) {
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the action throws. */
+ @Test
+ fun testMaybeCollectThrowingAction() = runTest {
+ expect(1)
+ try {
+ Maybe.just("OK").collect {
+ expect(2)
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
@Test
fun testSuppressedException() = runTest {
val maybe = rxMaybe(currentDispatcher()) {
@@ -241,7 +317,7 @@ class MaybeTest : TestBase() {
}
}
try {
- maybe.await()
+ maybe.awaitSingleOrNull()
expectUnreached()
} catch (e: TestException) {
assertTrue(e.suppressed[0] is TestException2)
@@ -301,7 +377,7 @@ class MaybeTest : TestBase() {
rxMaybe(Dispatchers.Unconfined) {
expect(1)
42
- }.subscribe({ throw LinkageError() })
+ }.subscribe { throw LinkageError() }
finish(3)
}
}