aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx3
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3')
-rw-r--r--reactive/kotlinx-coroutines-rx3/README.md34
-rw-r--r--reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api2
-rw-r--r--reactive/kotlinx-coroutines-rx3/build.gradle20
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxAwait.kt145
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt1
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxChannel.kt14
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt9
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxConvert.kt18
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt1
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt9
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxObservable.kt116
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxSingle.kt9
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt30
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt16
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt15
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt98
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt68
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt37
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt30
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt2
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt20
-rw-r--r--reactive/kotlinx-coroutines-rx3/test/SingleTest.kt28
22 files changed, 518 insertions, 204 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/README.md b/reactive/kotlinx-coroutines-rx3/README.md
index f9d3c5a8..1530558c 100644
--- a/reactive/kotlinx-coroutines-rx3/README.md
+++ b/reactive/kotlinx-coroutines-rx3/README.md
@@ -25,8 +25,8 @@ Suspending extension functions and suspending iteration:
| **Name** | **Description**
| -------- | ---------------
| [CompletableSource.await][io.reactivex.rxjava3.core.CompletableSource.await] | Awaits for completion of the completable value
-| [MaybeSource.await][io.reactivex.rxjava3.core.MaybeSource.await] | Awaits for the value of the maybe and returns it or null
-| [MaybeSource.awaitOrDefault][io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default
+| [MaybeSource.awaitSingle][io.reactivex.rxjava3.core.MaybeSource.awaitSingle] | Awaits for the value of the maybe and returns it or throws an exception
+| [MaybeSource.awaitSingleOrNull][io.reactivex.rxjava3.core.MaybeSource.awaitSingleOrNull] | Awaits for the value of the maybe and returns it or null
| [SingleSource.await][io.reactivex.rxjava3.core.SingleSource.await] | Awaits for completion of the single value and returns it
| [ObservableSource.awaitFirst][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
| [ObservableSource.awaitFirstOrDefault][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
@@ -69,21 +69,21 @@ Conversion functions:
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-single.html
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-observable.html
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-flowable.html
-[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-flowable.html
-[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-observable.html
-[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/as-flow.html
-[io.reactivex.rxjava3.core.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-completable-source/await.html
-[io.reactivex.rxjava3.core.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await.html
-[io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await-or-default.html
-[io.reactivex.rxjava3.core.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-single-source/await.html
-[io.reactivex.rxjava3.core.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first.html
-[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-default.html
-[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-else.html
-[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-null.html
-[io.reactivex.rxjava3.core.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-single.html
-[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-job/as-completable.html
-[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-deferred/as-single.html
-[io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-scheduler/as-coroutine-dispatcher.html
+[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-flowable.html
+[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-observable.html
+[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-flow.html
+[io.reactivex.rxjava3.core.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await.html
+[io.reactivex.rxjava3.core.MaybeSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-single.html
+[io.reactivex.rxjava3.core.MaybeSource.awaitSingleOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-single-or-null.html
+[io.reactivex.rxjava3.core.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first-or-default.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first-or-else.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first-or-null.html
+[io.reactivex.rxjava3.core.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-single.html
+[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-completable.html
+[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-single.html
+[io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-coroutine-dispatcher.html
<!--- END -->
diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
index 6d2dd63d..f6f3f1d0 100644
--- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
+++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api
@@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx3/RxAwaitKt {
public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitSingle (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitSingleOrNull (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
public final class kotlinx/coroutines/rx3/RxChannelKt {
diff --git a/reactive/kotlinx-coroutines-rx3/build.gradle b/reactive/kotlinx-coroutines-rx3/build.gradle
index a5de40d8..15ef66da 100644
--- a/reactive/kotlinx-coroutines-rx3/build.gradle
+++ b/reactive/kotlinx-coroutines-rx3/build.gradle
@@ -1,13 +1,15 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+import org.jetbrains.dokka.gradle.DokkaTaskPartial
+
targetCompatibility = JavaVersion.VERSION_1_8
dependencies {
- compile project(':kotlinx-coroutines-reactive')
- testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
- testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
- compile "io.reactivex.rxjava3:rxjava:$rxjava3_version"
+ api project(':kotlinx-coroutines-reactive')
+ testImplementation project(':kotlinx-coroutines-reactive').sourceSets.test.output
+ testImplementation "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
+ api "io.reactivex.rxjava3:rxjava:$rxjava3_version"
}
compileTestKotlin {
@@ -18,10 +20,12 @@ compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
-tasks.withType(dokka.getClass()) {
- externalDocumentationLink {
- url = new URL('http://reactivex.io/RxJava/3.x/javadoc/')
- packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
+tasks.withType(DokkaTaskPartial.class) {
+ dokkaSourceSets.configureEach {
+ externalDocumentationLink {
+ url.set(new URL('http://reactivex.io/RxJava/3.x/javadoc/'))
+ packageListUrl.set(projectDir.toPath().resolve("package.list").toUri().toURL())
+ }
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
index 8ac0a10c..2a14cf7c 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
@@ -15,11 +15,12 @@ import kotlin.coroutines.*
// ------------------------ CompletableSource ------------------------
/**
- * Awaits for completion of this completable without blocking a thread.
- * Returns `Unit` or throws the corresponding exception if this completable had produced error.
+ * Awaits for completion of this completable without blocking the thread.
+ * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
*
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
- * suspending function is suspended, this function immediately resumes with [CancellationException].
+ * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
+ * subscription.
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
@@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
// ------------------------ MaybeSource ------------------------
/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
+ * [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
+ subscribe(object : MaybeObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(null) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ *
+ * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
+ */
+public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
+
+/**
* Awaits for completion of the maybe without blocking a thread.
* Returns the resulting value, null if no value was produced or throws the corresponding exception if this
* maybe had produced error.
@@ -39,9 +71,20 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
+ * a value, as opposed to throwing in such case.
+ *
+ * @suppress
*/
-@Suppress("UNCHECKED_CAST")
-public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull()")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
/**
* Awaits for completion of the maybe without blocking a thread.
@@ -51,25 +94,30 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
+ * details).
+ *
+ * @suppress
*/
-public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
- subscribe(object : MaybeObserver<T> {
- override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
- override fun onComplete() { cont.resume(default) }
- override fun onSuccess(t: T) { cont.resume(t) }
- override fun onError(error: Throwable) { cont.resumeWithException(error) }
- })
-}
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
// ------------------------ SingleSource ------------------------
/**
- * Awaits for completion of the single value without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this single had produced error.
+ * Awaits for completion of the single value response without blocking the thread.
+ * Returns the resulting value, or throws the corresponding exception if this response produces an error.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
@@ -82,69 +130,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
// ------------------------ ObservableSource ------------------------
/**
- * Awaits for the first value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
+ * if the observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
/**
- * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
+ * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
+ * corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
/**
- * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
+ * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
+ * exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
/**
- * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
+ * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
+ * the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
-public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
+public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
+ awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
/**
- * Awaits for the last value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the last value from the given [Observable] without blocking the thread and
+ * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
/**
- * Awaits for the single value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
+ * if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
- * @throws IllegalArgumentException if observable emits more than one value
+ * @throws NoSuchElementException if the observable does not emit any value
+ * @throws IllegalArgumentException if the observable emits more than one value
*/
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
@@ -218,3 +270,4 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
}
})
}
+
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
index 29951598..1017b112 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
@@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo
try {
RxJavaPlugins.onError(cause)
} catch (e: Throwable) {
+ cause.addSuppressed(e)
handleCoroutineException(context, cause)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
index 76333f2e..21238d24 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
@@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.flow.*
/**
* Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
@@ -40,14 +41,18 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
/**
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
- * Cancels subscription if any exception happens during collect.
+ *
+ * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
+ * [collect].
*/
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)
/**
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
- * Cancels subscription if any exception happens during collect.
+ *
+ * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
+ * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)
@@ -69,11 +74,12 @@ private class SubscriptionChannel<T> :
}
override fun onSuccess(t: T) {
- offer(t)
+ trySend(t)
+ close(cause = null)
}
override fun onNext(t: T) {
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onComplete() {
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
index f4c5d7e7..47cc6ad3 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
@@ -39,7 +39,7 @@ private fun rxCompletableInternal(
private class RxCompletableCoroutine(
parentContext: CoroutineContext,
private val subscriber: CompletableEmitter
-) : AbstractCoroutine<Unit>(parentContext, true) {
+) : AbstractCoroutine<Unit>(parentContext, false, true) {
override fun onCompleted(value: Unit) {
try {
subscriber.onComplete()
@@ -50,11 +50,12 @@ private class RxCompletableCoroutine(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index 63e30f26..b4693a55 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -26,7 +26,6 @@ import kotlin.coroutines.*
*
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
this@asCompletable.join()
}
@@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
this@asMaybe.await()
}
@@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay
*
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
this@asSingle.await()
}
@@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T>
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) {
+ /*
+ * Channel was closed by the downstream, so the exception (if any)
+ * also was handled by the same downstream
+ */
try {
- sendBlocking(t)
- } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
- // Is handled by the downstream flow
+ trySendBlocking(t)
+ } catch (e: InterruptedException) {
+ // RxJava interrupts the source
}
}
override fun onError(e: Throwable) { close(e) }
@@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
@@ -137,10 +136,10 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
+/** @suppress */
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@@ -148,6 +147,7 @@ public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutin
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)
+/** @suppress */
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
index 445a6140..9357f283 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
@@ -28,7 +28,6 @@ import kotlin.coroutines.*
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> rxFlowable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
index ca1d5b59..12d0197b 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
@@ -40,7 +40,7 @@ private fun <T> rxMaybeInternal(
private class RxMaybeCoroutine<T>(
parentContext: CoroutineContext,
private val subscriber: MaybeEmitter<T>
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
@@ -51,11 +51,12 @@ private class RxMaybeCoroutine<T>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
index 7bd07750..57007bbd 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
@@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
+import kotlinx.coroutines.internal.*
/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -29,7 +30,6 @@ import kotlin.coroutines.*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> rxObservable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
@@ -54,39 +54,35 @@ private const val OPEN = 0 // open channel, still working
private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
-private class RxObservableCoroutine<T: Any>(
+private class RxObservableCoroutine<T : Any>(
parentContext: CoroutineContext,
private val subscriber: ObservableEmitter<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
+) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
- // Mutex is locked when while subscriber.onXXX is being invoked
+ // Mutex is locked while subscriber.onXXX is being invoked
private val mutex = Mutex()
private val _signal = atomic(OPEN)
- override val isClosedForSend: Boolean get() = isCompleted
- override val isFull: Boolean = mutex.isLocked
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
- doLockedNext(element)
- return true
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (offer(element)) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -94,30 +90,39 @@ private class RxObservableCoroutine<T: Any>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
- override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ override fun <R> registerSelectClause2(
+ select: SelectInstance<R>,
+ element: T,
+ block: suspend (SendChannel<T>) -> R
+ ) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
// assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
+ private fun doLockedNext(elem: T): Throwable? {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted(completionCause, completionCauseHandled)
- throw getCancellationException()
+ return getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
- mutex.unlock()
- throw e
+ val cause = UndeliverableException(e)
+ val causeDelivered = close(cause)
+ unlockAndCheckCompleted()
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ handleUndeliverableException(cause, context)
+ getCancellationException()
+ }
}
/*
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
@@ -126,6 +131,7 @@ private class RxObservableCoroutine<T: Any>(
* We have to recheck `isCompleted` after `unlock` anyway.
*/
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -139,33 +145,31 @@ private class RxObservableCoroutine<T: Any>(
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
// cancellation failures
try {
- if (_signal.value >= CLOSED) {
- _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ if (_signal.value == SIGNALLED)
+ return
+ _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = cause?.let { unwrap(it) }
+ if (unwrappedCause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.tryOnError(cause)
- if (!handled && cause.isFatal()) {
- handleUndeliverableException(cause, context)
- }
- }
- else {
- subscriber.onComplete()
- }
- } catch (e: Throwable) {
- // Unhandled exception (cannot handle in other way, since we are already complete)
+ subscriber.onComplete()
+ } catch (e: Exception) {
handleUndeliverableException(e, context)
}
+ } else if (unwrappedCause is UndeliverableException && !handled) {
+ /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
+ * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
+ * cancelled. */
+ handleUndeliverableException(cause, context)
+ } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
+ try {
+ /** If the subscriber is already in a terminal state, the error will be signalled to
+ * `RxJavaPlugins.onError`. */
+ subscriber.onError(cause)
+ } catch (e: Exception) {
+ cause.addSuppressed(e)
+ handleUndeliverableException(cause, context)
+ }
}
} finally {
mutex.unlock()
@@ -187,9 +191,3 @@ private class RxObservableCoroutine<T: Any>(
}
}
-internal fun Throwable.isFatal() = try {
- Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
- false
-} catch (e: Throwable) {
- true
-}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
index f4d07fb4..e7678f0d 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
@@ -39,7 +39,7 @@ private fun <T : Any> rxSingleInternal(
private class RxSingleCoroutine<T: Any>(
parentContext: CoroutineContext,
private val subscriber: SingleEmitter<T>
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
subscriber.onSuccess(value)
@@ -50,11 +50,12 @@ private class RxSingleCoroutine<T: Any>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
index e5399d16..cfdb6d41 100644
--- a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt
@@ -98,6 +98,31 @@ class CompletableTest : TestBase() {
}
}
+ /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is
+ * cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val completable = CompletableSource { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ completable.await()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
@Test
fun testSuppressedException() = runTest {
val completable = rxCompletable(currentDispatcher()) {
@@ -119,7 +144,7 @@ class CompletableTest : TestBase() {
}
@Test
- fun testUnhandledException() = runTest() {
+ fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
val handler = { e: Throwable ->
@@ -165,8 +190,7 @@ class CompletableTest : TestBase() {
withExceptionHandler(handler) {
rxCompletable(Dispatchers.Unconfined) {
expect(1)
- 42
- }.subscribe({ throw LinkageError() })
+ }.subscribe { throw LinkageError() }
finish(3)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
index 8cbd7ee8..126cb818 100644
--- a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt
@@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is reported to both onError and CEH
+ expect(2) // Fatal exceptions are not treated as special
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxFlowable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Fatal exception is reported to both onError and CEH
- finish(5)
+ }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled
+ finish(4)
}
@Test
diff --git a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
index 395672ce..1302124f 100644
--- a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt
@@ -125,6 +125,21 @@ class IntegrationTest(
finish(3)
}
+ @Test
+ fun testObservableWithTimeout() = runTest {
+ val observable = rxObservable<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
+ }
+ try {
+ expect(1)
+ observable.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
observable.collect {
diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
index e0cec748..bea939ef 100644
--- a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt
@@ -7,13 +7,12 @@ package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.exceptions.*
-import io.reactivex.rxjava3.functions.*
import io.reactivex.rxjava3.internal.functions.Functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
-import java.util.concurrent.CancellationException
import kotlin.test.*
class MaybeTest : TestBase() {
@@ -47,7 +46,7 @@ class MaybeTest : TestBase() {
null
}
expect(2)
- maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+ maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
expect(5)
})
expect(3)
@@ -85,7 +84,7 @@ class MaybeTest : TestBase() {
expectUnreached()
}
expect(2)
- // nothing is called on a disposed rx3 maybe
+ // nothing is called on a disposed rx2 maybe
val sub = maybe.subscribe({
expectUnreached()
}, {
@@ -112,18 +111,45 @@ class MaybeTest : TestBase() {
@Test
fun testMaybeAwait() = runBlocking {
- assertEquals("OK", Maybe.just("O").await() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
}
@Test
- fun testMaybeAwaitForNull() = runBlocking {
- assertNull(Maybe.empty<String>().await())
+ fun testMaybeAwaitForNull(): Unit = runBlocking {
+ assertNull(Maybe.empty<String>().awaitSingleOrNull())
+ assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
+ }
+
+ /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testMaybeAwaitCancellation() = runTest {
+ expect(1)
+ val maybe = MaybeSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ maybe.awaitSingleOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
}
@Test
fun testMaybeEmitAndAwait() {
val maybe = rxMaybe {
- Maybe.just("O").await() + "K"
+ Maybe.just("O").awaitSingleOrNull() + "K"
}
checkMaybeValue(maybe) {
@@ -205,7 +231,7 @@ class MaybeTest : TestBase() {
@Test
fun testCancelledConsumer() = runTest {
expect(1)
- val maybe = rxMaybe<Int>(currentDispatcher()) {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
try {
delay(Long.MAX_VALUE)
@@ -228,6 +254,56 @@ class MaybeTest : TestBase() {
finish(7)
}
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectEmpty() = runTest {
+ expect(1)
+ Maybe.empty<Int>().collect {
+ expectUnreached()
+ }
+ finish(2)
+ }
+
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectSingle() = runTest {
+ expect(1)
+ Maybe.just("OK").collect {
+ assertEquals("OK", it)
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the Maybe raises an error. */
+ @Test
+ fun testMaybeCollectThrowingMaybe() = runTest {
+ expect(1)
+ try {
+ Maybe.error<Int>(TestException()).collect {
+ expectUnreached()
+ }
+ } catch (e: TestException) {
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the action throws. */
+ @Test
+ fun testMaybeCollectThrowingAction() = runTest {
+ expect(1)
+ try {
+ Maybe.just("OK").collect {
+ expect(2)
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
@Test
fun testSuppressedException() = runTest {
val maybe = rxMaybe(currentDispatcher()) {
@@ -241,7 +317,7 @@ class MaybeTest : TestBase() {
}
}
try {
- maybe.await()
+ maybe.awaitSingleOrNull()
expectUnreached()
} catch (e: TestException) {
assertTrue(e.suppressed[0] is TestException2)
@@ -301,7 +377,7 @@ class MaybeTest : TestBase() {
rxMaybe(Dispatchers.Unconfined) {
expect(1)
42
- }.subscribe({ throw LinkageError() })
+ }.subscribe { throw LinkageError() }
finish(3)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt
new file mode 100644
index 00000000..680786f9
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx3
+
+import io.reactivex.rxjava3.core.ObservableSource
+import io.reactivex.rxjava3.disposables.*
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableCollectTest: TestBase() {
+
+ /** Tests the behavior of [collect] when the publisher raises an error. */
+ @Test
+ fun testObservableCollectThrowingObservable() = runTest {
+ expect(1)
+ var sum = 0
+ try {
+ rxObservable {
+ for (i in 0..100) {
+ send(i)
+ }
+ throw TestException()
+ }.collect {
+ sum += it
+ }
+ } catch (e: TestException) {
+ assertTrue(sum > 0)
+ finish(2)
+ }
+ }
+
+ @Test
+ fun testObservableCollectThrowingAction() = runTest {
+ expect(1)
+ var sum = 0
+ val expectedSum = 5
+ try {
+ var disposed = false
+ ObservableSource<Int> { observer ->
+ launch(Dispatchers.Default) {
+ observer.onSubscribe(object : Disposable {
+ override fun dispose() {
+ disposed = true
+ expect(expectedSum + 2)
+ }
+
+ override fun isDisposed(): Boolean = disposed
+ })
+ while (!disposed) {
+ observer.onNext(1)
+ }
+ }
+ }.collect {
+ expect(sum + 2)
+ sum += it
+ if (sum == expectedSum) {
+ throw TestException()
+ }
+ }
+ } catch (e: TestException) {
+ assertEquals(expectedSum, sum)
+ finish(expectedSum + 3)
+ }
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
index 1183b2ae..5ddb36ed 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt
@@ -8,6 +8,7 @@ import io.reactivex.rxjava3.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
+import java.util.concurrent.*
import kotlin.test.*
class ObservableExceptionHandlingTest : TestBase() {
@@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
- assertTrue(t is UndeliverableException && t.cause is T)
+ assertTrue(t is UndeliverableException && t.cause is T, "$t")
expect(expect)
}
@@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
- rxObservable<Int>(Dispatchers.Unconfined) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw LinkageError()
}.subscribe({
@@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is not reported in onError
+ expect(2) // Fatal exceptions are not treated in a special manner
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+ val latch = CountDownLatch(1)
rxObservable(Dispatchers.Unconfined) {
expect(1)
- send(Unit)
+ val result = trySend(Unit)
+ val exception = result.exceptionOrNull()
+ assertTrue(exception is UndeliverableException)
+ assertTrue(exception.cause is LinkageError)
+ assertTrue(isClosedForSend)
+ expect(4)
+ latch.countDown()
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Unreached because fatal errors are rethrown
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ latch.await()
finish(5)
}
@@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}.subscribe({
expect(2)
throw TestException()
- }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ }, { expect(3) })
finish(4)
}
@@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
@@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) })
- finish(5)
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ finish(4)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
index 2a3ce046..692f0144 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt
@@ -5,7 +5,9 @@
package kotlinx.coroutines.rx3
import io.reactivex.rxjava3.core.*
+import io.reactivex.rxjava3.disposables.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() {
@Test
fun testAwaitFirstOrNull() {
- val observable = rxObservable<String> {
+ val observable = rxObservable {
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() {
}
}
+ /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of
+ * the subscription when their [Job] is cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val observable = ObservableSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ observable.awaitFirst()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
+
@Test
fun testExceptionFromObservable() {
val observable = rxObservable {
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
index 431a7a78..01d6a20e 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt
@@ -27,7 +27,7 @@ class ObservableSourceAsFlowStressTest : TestBase() {
val latch = Channel<Unit>(1)
var i = 0
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
- .doOnNext { if (++i > 100) latch.offer(Unit) }
+ .doOnNext { if (++i > 100) latch.trySend(Unit) }
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
latch.receive()
job.cancelAndJoin()
diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
index 2f043161..58a54616 100644
--- a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt
@@ -1,12 +1,14 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx3
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import org.junit.Test
+import kotlin.onSuccess
import kotlin.test.*
class ObservableSubscriptionSelectTest : TestBase() {
@@ -22,23 +24,23 @@ class ObservableSubscriptionSelectTest : TestBase() {
val channelB = source.openSubscription()
loop@ while (true) {
val done: Int = select {
- channelA.onReceiveOrNull {
- if (it != null) assertEquals(a++, it)
- if (it == null) 0 else 1
+ channelA.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(a++, it) }
+ if (result.isSuccess) 1 else 0
}
- channelB.onReceiveOrNull {
- if (it != null) assertEquals(b++, it)
- if (it == null) 0 else 2
+ channelB.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(b++, it) }
+ if (result.isSuccess) 2 else 0
}
}
when (done) {
0 -> break@loop
1 -> {
- val r = channelB.receiveOrNull()
+ val r = channelB.receiveCatching().getOrNull()
if (r != null) assertEquals(b++, r)
}
2 -> {
- val r = channelA.receiveOrNull()
+ val r = channelA.receiveCatching().getOrNull()
if (r != null) assertEquals(a++, r)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
index 46bcaf84..3e763aa5 100644
--- a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt
@@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.*
import io.reactivex.rxjava3.exceptions.*
import io.reactivex.rxjava3.functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -98,6 +99,31 @@ class SingleTest : TestBase() {
assertEquals("OK", Single.just("O").await() + "K")
}
+ /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testSingleAwaitCancellation() = runTest {
+ expect(1)
+ val single = SingleSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ single.await()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
@Test
fun testSingleEmitAndAwait() {
val single = rxSingle {
@@ -221,7 +247,7 @@ class SingleTest : TestBase() {
fun testFatalExceptionInSingle() = runTest {
rxSingle(Dispatchers.Unconfined) {
throw LinkageError()
- }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) })
+ }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) }
finish(2)
}