diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/test')
10 files changed, 301 insertions, 51 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index 298b32bf..16f0005b 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -98,6 +98,31 @@ class CompletableTest : TestBase() { } } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is + * cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val completable = CompletableSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + completable.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSuppressedException() = runTest { val completable = rxCompletable(currentDispatcher()) { @@ -119,7 +144,7 @@ class CompletableTest : TestBase() { } @Test - fun testUnhandledException() = runTest() { + fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null val handler = { e: Throwable -> @@ -165,8 +190,7 @@ class CompletableTest : TestBase() { withExceptionHandler(handler) { rxCompletable(Dispatchers.Unconfined) { expect(1) - 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt index 05b7ee92..31643929 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { rxFlowable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is reported to both onError and CEH + expect(2) // Fatal exceptions are not treated as special }) - finish(4) + finish(3) } @Test @@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxFlowable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { rxFlowable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Fatal exception is reported to both onError and CEH - finish(5) + }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled + finish(4) } @Test diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt index 540fa76b..8a6362ad 100644 --- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* @@ -92,7 +93,7 @@ class IntegrationTest( assertEquals(n, observable.awaitLast()) assertFailsWith<IllegalArgumentException> { observable.awaitSingle() } checkNumbers(n, observable) - val channel = observable.openSubscription() + val channel = observable.toChannel() checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext))) channel.cancel() } @@ -124,6 +125,21 @@ class IntegrationTest( finish(3) } + @Test + fun testObservableWithTimeout() = runTest { + val observable = rxObservable<Int> { + expect(2) + withTimeout(1) { delay(100) } + } + try { + expect(1) + observable.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + private suspend fun checkNumbers(n: Int, observable: Observable<Int>) { var last = 0 observable.collect { diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 08427dcf..f5d128d3 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -7,13 +7,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* import io.reactivex.exceptions.* -import io.reactivex.functions.* import io.reactivex.internal.functions.Functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.test.* class MaybeTest : TestBase() { @@ -47,7 +46,7 @@ class MaybeTest : TestBase() { null } expect(2) - maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action { + maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, { expect(5) }) expect(3) @@ -112,18 +111,45 @@ class MaybeTest : TestBase() { @Test fun testMaybeAwait() = runBlocking { - assertEquals("OK", Maybe.just("O").await() + "K") + assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K") + assertEquals("OK", Maybe.just("O").awaitSingle() + "K") } @Test - fun testMaybeAwaitForNull() = runBlocking { - assertNull(Maybe.empty<String>().await()) + fun testMaybeAwaitForNull(): Unit = runBlocking { + assertNull(Maybe.empty<String>().awaitSingleOrNull()) + assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() } + } + + /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testMaybeAwaitCancellation() = runTest { + expect(1) + val maybe = MaybeSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + maybe.awaitSingleOrNull() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) } @Test fun testMaybeEmitAndAwait() { val maybe = rxMaybe { - Maybe.just("O").await() + "K" + Maybe.just("O").awaitSingleOrNull() + "K" } checkMaybeValue(maybe) { @@ -205,7 +231,7 @@ class MaybeTest : TestBase() { @Test fun testCancelledConsumer() = runTest { expect(1) - val maybe = rxMaybe<Int>(currentDispatcher()) { + val maybe = rxMaybe(currentDispatcher()) { expect(4) try { delay(Long.MAX_VALUE) @@ -228,6 +254,56 @@ class MaybeTest : TestBase() { finish(7) } + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectEmpty() = runTest { + expect(1) + Maybe.empty<Int>().collect { + expectUnreached() + } + finish(2) + } + + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectSingle() = runTest { + expect(1) + Maybe.just("OK").collect { + assertEquals("OK", it) + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the Maybe raises an error. */ + @Test + fun testMaybeCollectThrowingMaybe() = runTest { + expect(1) + try { + Maybe.error<Int>(TestException()).collect { + expectUnreached() + } + } catch (e: TestException) { + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testMaybeCollectThrowingAction() = runTest { + expect(1) + try { + Maybe.just("OK").collect { + expect(2) + throw TestException() + } + } catch (e: TestException) { + expect(3) + } + finish(4) + } + @Test fun testSuppressedException() = runTest { val maybe = rxMaybe(currentDispatcher()) { @@ -241,7 +317,7 @@ class MaybeTest : TestBase() { } } try { - maybe.await() + maybe.awaitSingleOrNull() expectUnreached() } catch (e: TestException) { assertTrue(e.suppressed[0] is TestException2) @@ -301,7 +377,7 @@ class MaybeTest : TestBase() { rxMaybe(Dispatchers.Unconfined) { expect(1) 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt new file mode 100644 index 00000000..508f594a --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt @@ -0,0 +1,69 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import io.reactivex.disposables.* +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.test.* + +class ObservableCollectTest: TestBase() { + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testObservableCollectThrowingObservable() = runTest { + expect(1) + var sum = 0 + try { + rxObservable { + for (i in 0..100) { + send(i) + } + throw TestException() + }.collect { + sum += it + } + } catch (e: TestException) { + assertTrue(sum > 0) + finish(2) + } + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testObservableCollectThrowingAction() = runTest { + expect(1) + var sum = 0 + val expectedSum = 5 + try { + var disposed = false + ObservableSource<Int> { observer -> + launch(Dispatchers.Default) { + observer.onSubscribe(object : Disposable { + override fun dispose() { + disposed = true + expect(expectedSum + 2) + } + + override fun isDisposed(): Boolean = disposed + }) + while (!disposed) { + observer.onNext(1) + } + } + }.collect { + expect(sum + 2) + sum += it + if (sum == expectedSum) { + throw TestException() + } + } + } catch (e: TestException) { + assertEquals(expectedSum, sum) + finish(expectedSum + 3) + } + } +}
\ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index d6cdd3ca..fb3d0f69 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -8,6 +8,7 @@ import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class ObservableExceptionHandlingTest : TestBase() { @@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() { } private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable -> - assertTrue(t is UndeliverableException && t.cause is T) + assertTrue(t is UndeliverableException && t.cause is T, "$t") expect(expect) } @@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) { - rxObservable<Int>(Dispatchers.Unconfined) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { + rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw LinkageError() }.subscribe({ @@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test @@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxObservable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is not reported in onError + expect(2) // Fatal exceptions are not treated in a special manner }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { + val latch = CountDownLatch(1) rxObservable(Dispatchers.Unconfined) { expect(1) - send(Unit) + val result = trySend(Unit) + val exception = result.exceptionOrNull() + assertTrue(exception is UndeliverableException) + assertTrue(exception.cause is LinkageError) + assertTrue(isClosedForSend) + expect(4) + latch.countDown() }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Unreached because fatal errors are rethrown + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + latch.await() finish(5) } @@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() { }.subscribe({ expect(2) throw TestException() - }, { expect(3) }) // not reported to onError because came from the subscribe itself + }, { expect(3) }) finish(4) } @@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) @@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) - finish(5) + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + finish(4) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt index 4454190f..e246407a 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt @@ -5,7 +5,9 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.disposables.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val observable = rxObservable<String> { + val observable = rxObservable { send(Observable.empty<String>().awaitFirstOrNull() ?: "OK") } @@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() { } } + /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of + * the subscription when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val observable = ObservableSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + observable.awaitFirst() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + + @Test fun testExceptionFromObservable() { val observable = rxObservable { diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt index 159f3729..0253fced 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt @@ -26,7 +26,7 @@ class ObservableSourceAsFlowStressTest : TestBase() { val latch = Channel<Unit>(1) var i = 0 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) - .doOnNext { if (++i > 100) latch.offer(Unit) } + .doOnNext { if (++i > 100) latch.trySend(Unit) } val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) latch.receive() job.cancelAndJoin() diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt index 3cd3bbff..2c22cbf0 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt @@ -1,12 +1,14 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx2 import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import org.junit.Test +import kotlin.onSuccess import kotlin.test.* class ObservableSubscriptionSelectTest : TestBase() { @@ -18,27 +20,27 @@ class ObservableSubscriptionSelectTest : TestBase() { var a = 0 var b = 0 // open two subs - val channelA = source.openSubscription() - val channelB = source.openSubscription() + val channelA = source.toChannel() + val channelB = source.toChannel() loop@ while (true) { val done: Int = select { - channelA.onReceiveOrNull { - if (it != null) assertEquals(a++, it) - if (it == null) 0 else 1 + channelA.onReceiveCatching { result -> + result.onSuccess { assertEquals(a++, it) } + if (result.isSuccess) 1 else 0 } - channelB.onReceiveOrNull { - if (it != null) assertEquals(b++, it) - if (it == null) 0 else 2 + channelB.onReceiveCatching { result -> + result.onSuccess { assertEquals(b++, it) } + if (result.isSuccess) 2 else 0 } } when (done) { 0 -> break@loop 1 -> { - val r = channelB.receiveOrNull() + val r = channelB.receiveCatching().getOrNull() if (r != null) assertEquals(b++, r) } 2 -> { - val r = channelA.receiveOrNull() + val r = channelA.receiveCatching().getOrNull() if (r != null) assertEquals(a++, r) } } @@ -48,4 +50,4 @@ class ObservableSubscriptionSelectTest : TestBase() { // should receive one of them fully assertTrue(a == n || b == n) } -}
\ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index c66188a1..b359d963 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -9,6 +9,7 @@ import io.reactivex.disposables.* import io.reactivex.exceptions.* import io.reactivex.functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -98,6 +99,31 @@ class SingleTest : TestBase() { assertEquals("OK", Single.just("O").await() + "K") } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testSingleAwaitCancellation() = runTest { + expect(1) + val single = SingleSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + single.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSingleEmitAndAwait() { val single = rxSingle { @@ -221,7 +247,7 @@ class SingleTest : TestBase() { fun testFatalExceptionInSingle() = runTest { rxSingle(Dispatchers.Unconfined) { throw LinkageError() - }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) + }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) } finish(2) } |