aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt133
1 files changed, 133 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
new file mode 100644
index 00000000..6d247cfa
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import kotlinx.coroutines.*
+import org.junit.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableExceptionHandlingTest : TestBase() {
+
+ @Before
+ fun setup() {
+ ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
+ }
+
+ private inline fun <reified T : Throwable> ceh(expect: Int) = CoroutineExceptionHandler { _, t ->
+ assertTrue(t is T)
+ expect(expect)
+ }
+
+ private fun cehUnreached() = CoroutineExceptionHandler { _, _ -> expectUnreached() }
+
+ @Test
+ fun testException() = runTest {
+ rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
+ expect(1)
+ throw TestException()
+ }.subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Reported to onError
+ })
+ finish(3)
+ }
+
+ @Test
+ fun testFatalException() = runTest {
+ rxObservable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
+ expect(1)
+ throw LinkageError()
+ }.subscribe({
+ expectUnreached()
+ }, {
+ expect(2)
+ })
+ finish(4)
+ }
+
+ @Test
+ fun testExceptionAsynchronous() = runTest {
+ rxObservable<Int>(Dispatchers.Unconfined) {
+ expect(1)
+ throw TestException()
+ }.publish()
+ .refCount()
+ .subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Reported to onError
+ })
+ finish(3)
+ }
+
+ @Test
+ fun testFatalExceptionAsynchronous() = runTest {
+ rxObservable<Int>(Dispatchers.Unconfined + ceh<LinkageError>(3)) {
+ expect(1)
+ throw LinkageError()
+ }.publish()
+ .refCount()
+ .subscribe({
+ expectUnreached()
+ }, {
+ expect(2) // Fatal exception is not reported in onError
+ })
+ finish(4)
+ }
+
+ @Test
+ fun testFatalExceptionFromSubscribe() = runTest {
+ rxObservable(Dispatchers.Unconfined + ceh<LinkageError>(4)) {
+ expect(1)
+ send(Unit)
+ }.subscribe({
+ expect(2)
+ throw LinkageError()
+ }, { expect(3) }) // Unreached because fatal errors are rethrown
+ finish(5)
+ }
+
+ @Test
+ fun testExceptionFromSubscribe() = runTest {
+ rxObservable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.subscribe({
+ expect(2)
+ throw TestException()
+ }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ finish(4)
+ }
+
+ @Test
+ fun testAsynchronousExceptionFromSubscribe() = runTest {
+ rxObservable(Dispatchers.Unconfined) {
+ expect(1)
+ send(Unit)
+ }.publish()
+ .refCount()
+ .subscribe({
+ expect(2)
+ throw RuntimeException()
+ }, { expect(3) })
+ finish(4)
+ }
+
+ @Test
+ fun testAsynchronousFatalExceptionFromSubscribe() = runTest {
+ rxObservable(Dispatchers.Unconfined + ceh<LinkageError>(4)) {
+ expect(1)
+ send(Unit)
+ }.publish()
+ .refCount()
+ .subscribe({
+ expect(2)
+ throw LinkageError()
+ }, { expect(3) })
+ finish(5)
+ }
+}