diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3')
22 files changed, 518 insertions, 204 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/README.md b/reactive/kotlinx-coroutines-rx3/README.md index f9d3c5a8..1530558c 100644 --- a/reactive/kotlinx-coroutines-rx3/README.md +++ b/reactive/kotlinx-coroutines-rx3/README.md @@ -25,8 +25,8 @@ Suspending extension functions and suspending iteration: | **Name** | **Description** | -------- | --------------- | [CompletableSource.await][io.reactivex.rxjava3.core.CompletableSource.await] | Awaits for completion of the completable value -| [MaybeSource.await][io.reactivex.rxjava3.core.MaybeSource.await] | Awaits for the value of the maybe and returns it or null -| [MaybeSource.awaitOrDefault][io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default +| [MaybeSource.awaitSingle][io.reactivex.rxjava3.core.MaybeSource.awaitSingle] | Awaits for the value of the maybe and returns it or throws an exception +| [MaybeSource.awaitSingleOrNull][io.reactivex.rxjava3.core.MaybeSource.awaitSingleOrNull] | Awaits for the value of the maybe and returns it or null | [SingleSource.await][io.reactivex.rxjava3.core.SingleSource.await] | Awaits for completion of the single value and returns it | [ObservableSource.awaitFirst][io.reactivex.rxjava3.core.ObservableSource.awaitFirst] | Awaits for the first value from the given observable | [ObservableSource.awaitFirstOrDefault][io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default @@ -69,21 +69,21 @@ Conversion functions: [rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-single.html [rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-observable.html [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/rx-flowable.html -[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-flowable.html -[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.flow.-flow/as-observable.html -[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/as-flow.html -[io.reactivex.rxjava3.core.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-completable-source/await.html -[io.reactivex.rxjava3.core.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await.html -[io.reactivex.rxjava3.core.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-maybe-source/await-or-default.html -[io.reactivex.rxjava3.core.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-single-source/await.html -[io.reactivex.rxjava3.core.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first.html -[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-default.html -[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-else.html -[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-first-or-null.html -[io.reactivex.rxjava3.core.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-observable-source/await-single.html -[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-job/as-completable.html -[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/kotlinx.coroutines.-deferred/as-single.html -[io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/io.reactivex.rxjava3.core.-scheduler/as-coroutine-dispatcher.html +[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-flowable.html +[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-observable.html +[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-flow.html +[io.reactivex.rxjava3.core.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await.html +[io.reactivex.rxjava3.core.MaybeSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-single.html +[io.reactivex.rxjava3.core.MaybeSource.awaitSingleOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-single-or-null.html +[io.reactivex.rxjava3.core.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await.html +[io.reactivex.rxjava3.core.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first.html +[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first-or-default.html +[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first-or-else.html +[io.reactivex.rxjava3.core.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-first-or-null.html +[io.reactivex.rxjava3.core.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/await-single.html +[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-completable.html +[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-single.html +[io.reactivex.rxjava3.core.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx3/kotlinx.coroutines.rx3/as-coroutine-dispatcher.html <!--- END --> diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index 6d2dd63d..f6f3f1d0 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx3/RxAwaitKt { public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingle (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrNull (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/rx3/RxChannelKt { diff --git a/reactive/kotlinx-coroutines-rx3/build.gradle b/reactive/kotlinx-coroutines-rx3/build.gradle index a5de40d8..15ef66da 100644 --- a/reactive/kotlinx-coroutines-rx3/build.gradle +++ b/reactive/kotlinx-coroutines-rx3/build.gradle @@ -1,13 +1,15 @@ /* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +import org.jetbrains.dokka.gradle.DokkaTaskPartial + targetCompatibility = JavaVersion.VERSION_1_8 dependencies { - compile project(':kotlinx-coroutines-reactive') - testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output - testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version" - compile "io.reactivex.rxjava3:rxjava:$rxjava3_version" + api project(':kotlinx-coroutines-reactive') + testImplementation project(':kotlinx-coroutines-reactive').sourceSets.test.output + testImplementation "org.reactivestreams:reactive-streams-tck:$reactive_streams_version" + api "io.reactivex.rxjava3:rxjava:$rxjava3_version" } compileTestKotlin { @@ -18,10 +20,12 @@ compileKotlin { kotlinOptions.jvmTarget = "1.8" } -tasks.withType(dokka.getClass()) { - externalDocumentationLink { - url = new URL('http://reactivex.io/RxJava/3.x/javadoc/') - packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL() +tasks.withType(DokkaTaskPartial.class) { + dokkaSourceSets.configureEach { + externalDocumentationLink { + url.set(new URL('http://reactivex.io/RxJava/3.x/javadoc/')) + packageListUrl.set(projectDir.toPath().resolve("package.list").toUri().toURL()) + } } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 8ac0a10c..2a14cf7c 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -15,11 +15,12 @@ import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ /** - * Awaits for completion of this completable without blocking a thread. - * Returns `Unit` or throws the corresponding exception if this completable had produced error. + * Awaits for completion of this completable without blocking the thread. + * Returns `Unit`, or throws the corresponding exception if this completable produces an error. * * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this - * suspending function is suspended, this function immediately resumes with [CancellationException]. + * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its + * subscription. */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { @@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine // ------------------------ MaybeSource ------------------------ /** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this + * [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + */ +@Suppress("UNCHECKED_CAST") +public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : MaybeObserver<T> { + override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } + override fun onComplete() { cont.resume(null) } + override fun onSuccess(t: T) { cont.resume(t) } + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + * + * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. + */ +public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + +/** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this * maybe had produced error. @@ -39,9 +71,20 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. + * + * @suppress */ -@Suppress("UNCHECKED_CAST") -public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null) +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. @@ -51,25 +94,30 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). + * + * @suppress */ -public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver<T> { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(default) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) -} +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ /** - * Awaits for completion of the single value without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this single had produced error. + * Awaits for completion of the single value response without blocking the thread. + * Returns the resulting value, or throws the corresponding exception if this response produces an error. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { @@ -82,69 +130,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine // ------------------------ ObservableSource ------------------------ /** - * Awaits for the first value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or, + * if the observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the + * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, + * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws + * the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() +public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = + awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the last value from the given [Observable] without blocking the thread and + * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, + * if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value - * @throws IllegalArgumentException if observable emits more than one value + * @throws NoSuchElementException if the observable does not emit any value + * @throws IllegalArgumentException if the observable emits more than one value */ public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE) @@ -218,3 +270,4 @@ private suspend fun <T> ObservableSource<T>.awaitOne( } }) } + diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt index 29951598..1017b112 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt @@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo try { RxJavaPlugins.onError(cause) } catch (e: Throwable) { + cause.addSuppressed(e) handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 76333f2e..21238d24 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.flow.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. @@ -40,14 +41,18 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> { /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from + * [collect]. */ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) @@ -69,11 +74,12 @@ private class SubscriptionChannel<T> : } override fun onSuccess(t: T) { - offer(t) + trySend(t) + close(cause = null) } override fun onNext(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt index f4c5d7e7..47cc6ad3 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt @@ -39,7 +39,7 @@ private fun rxCompletableInternal( private class RxCompletableCoroutine( parentContext: CoroutineContext, private val subscriber: CompletableEmitter -) : AbstractCoroutine<Unit>(parentContext, true) { +) : AbstractCoroutine<Unit>(parentContext, false, true) { override fun onCompleted(value: Unit) { try { subscriber.onComplete() @@ -50,11 +50,12 @@ private class RxCompletableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 63e30f26..b4693a55 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -26,7 +26,6 @@ import kotlin.coroutines.* * * @param context -- the coroutine context from which the resulting completable is going to be signalled */ -@ExperimentalCoroutinesApi public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) { this@asCompletable.join() } @@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) { this@asMaybe.await() } @@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay * * @param context -- the coroutine context from which the resulting single is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) { this@asSingle.await() } @@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ -@ExperimentalCoroutinesApi public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { val disposableRef = AtomicReference<Disposable>() val observer = object : Observer<T> { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { + /* + * Channel was closed by the downstream, so the exception (if any) + * also was handled by the same downstream + */ try { - sendBlocking(t) - } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 - // Is handled by the downstream flow + trySendBlocking(t) + } catch (e: InterruptedException) { + // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } @@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if @@ -137,10 +136,10 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = Flowable.fromPublisher(asPublisher(context)) +/** @suppress */ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") @@ -148,6 +147,7 @@ public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutin public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = asFlowable(context) +/** @suppress */ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") diff --git a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt index 445a6140..9357f283 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt @@ -28,7 +28,6 @@ import kotlin.coroutines.* * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect */ -@ExperimentalCoroutinesApi public fun <T: Any> rxFlowable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt index ca1d5b59..12d0197b 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt @@ -40,7 +40,7 @@ private fun <T> rxMaybeInternal( private class RxMaybeCoroutine<T>( parentContext: CoroutineContext, private val subscriber: MaybeEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) @@ -51,11 +51,12 @@ private class RxMaybeCoroutine<T>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 7bd07750..57007bbd 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* +import kotlinx.coroutines.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -29,7 +30,6 @@ import kotlin.coroutines.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T : Any> rxObservable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -54,39 +54,35 @@ private const val OPEN = 0 // open channel, still working private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError -private class RxObservableCoroutine<T: Any>( +private class RxObservableCoroutine<T : Any>( parentContext: CoroutineContext, private val subscriber: ObservableEmitter<T> -) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { +) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { override val channel: SendChannel<T> get() = this - // Mutex is locked when while subscriber.onXXX is being invoked + // Mutex is locked while subscriber.onXXX is being invoked private val mutex = Mutex() private val _signal = atomic(OPEN) - override val isClosedForSend: Boolean get() = isCompleted - override val isFull: Boolean = mutex.isLocked + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false - doLockedNext(element) - return true - } + override fun trySend(element: T): ChannelResult<Unit> = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (offer(element)) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2<T, SendChannel<T>> @@ -94,30 +90,39 @@ private class RxObservableCoroutine<T: Any>( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) { + override fun <R> registerSelectClause2( + select: SelectInstance<R>, + element: T, + block: suspend (SendChannel<T>) -> R + ) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { + private fun doLockedNext(elem: T): Throwable? { // check if already closed for send if (!isActive) { doLockedSignalCompleted(completionCause, completionCauseHandled) - throw getCancellationException() + return getCancellationException() } // notify subscriber try { subscriber.onNext(elem) } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) - mutex.unlock() - throw e + val cause = UndeliverableException(e) + val causeDelivered = close(cause) + unlockAndCheckCompleted() + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + handleUndeliverableException(cause, context) + getCancellationException() + } } /* * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might @@ -126,6 +131,7 @@ private class RxObservableCoroutine<T: Any>( * We have to recheck `isCompleted` after `unlock` anyway. */ unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -139,33 +145,31 @@ private class RxObservableCoroutine<T: Any>( private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { // cancellation failures try { - if (_signal.value >= CLOSED) { - _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (_signal.value == SIGNALLED) + return + _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = cause?.let { unwrap(it) } + if (unwrappedCause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { - handleUndeliverableException(cause, context) - } - } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - // Unhandled exception (cannot handle in other way, since we are already complete) + subscriber.onComplete() + } catch (e: Exception) { handleUndeliverableException(e, context) } + } else if (unwrappedCause is UndeliverableException && !handled) { + /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, + * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already + * cancelled. */ + handleUndeliverableException(cause, context) + } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { + try { + /** If the subscriber is already in a terminal state, the error will be signalled to + * `RxJavaPlugins.onError`. */ + subscriber.onError(cause) + } catch (e: Exception) { + cause.addSuppressed(e) + handleUndeliverableException(cause, context) + } } } finally { mutex.unlock() @@ -187,9 +191,3 @@ private class RxObservableCoroutine<T: Any>( } } -internal fun Throwable.isFatal() = try { - Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode - false -} catch (e: Throwable) { - true -} diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt index f4d07fb4..e7678f0d 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt @@ -39,7 +39,7 @@ private fun <T : Any> rxSingleInternal( private class RxSingleCoroutine<T: Any>( parentContext: CoroutineContext, private val subscriber: SingleEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { subscriber.onSuccess(value) @@ -50,11 +50,12 @@ private class RxSingleCoroutine<T: Any>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt index e5399d16..cfdb6d41 100644 --- a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt @@ -98,6 +98,31 @@ class CompletableTest : TestBase() { } } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is + * cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val completable = CompletableSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + completable.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSuppressedException() = runTest { val completable = rxCompletable(currentDispatcher()) { @@ -119,7 +144,7 @@ class CompletableTest : TestBase() { } @Test - fun testUnhandledException() = runTest() { + fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null val handler = { e: Throwable -> @@ -165,8 +190,7 @@ class CompletableTest : TestBase() { withExceptionHandler(handler) { rxCompletable(Dispatchers.Unconfined) { expect(1) - 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt index 8cbd7ee8..126cb818 100644 --- a/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/FlowableExceptionHandlingTest.kt @@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { rxFlowable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is reported to both onError and CEH + expect(2) // Fatal exceptions are not treated as special }) - finish(4) + finish(3) } @Test @@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxFlowable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { rxFlowable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Fatal exception is reported to both onError and CEH - finish(5) + }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled + finish(4) } @Test diff --git a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt index 395672ce..1302124f 100644 --- a/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/IntegrationTest.kt @@ -125,6 +125,21 @@ class IntegrationTest( finish(3) } + @Test + fun testObservableWithTimeout() = runTest { + val observable = rxObservable<Int> { + expect(2) + withTimeout(1) { delay(100) } + } + try { + expect(1) + observable.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + private suspend fun checkNumbers(n: Int, observable: Observable<Int>) { var last = 0 observable.collect { diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt index e0cec748..bea939ef 100644 --- a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt @@ -7,13 +7,12 @@ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.exceptions.* -import io.reactivex.rxjava3.functions.* import io.reactivex.rxjava3.internal.functions.Functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.test.* class MaybeTest : TestBase() { @@ -47,7 +46,7 @@ class MaybeTest : TestBase() { null } expect(2) - maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action { + maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, { expect(5) }) expect(3) @@ -85,7 +84,7 @@ class MaybeTest : TestBase() { expectUnreached() } expect(2) - // nothing is called on a disposed rx3 maybe + // nothing is called on a disposed rx2 maybe val sub = maybe.subscribe({ expectUnreached() }, { @@ -112,18 +111,45 @@ class MaybeTest : TestBase() { @Test fun testMaybeAwait() = runBlocking { - assertEquals("OK", Maybe.just("O").await() + "K") + assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K") + assertEquals("OK", Maybe.just("O").awaitSingle() + "K") } @Test - fun testMaybeAwaitForNull() = runBlocking { - assertNull(Maybe.empty<String>().await()) + fun testMaybeAwaitForNull(): Unit = runBlocking { + assertNull(Maybe.empty<String>().awaitSingleOrNull()) + assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() } + } + + /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testMaybeAwaitCancellation() = runTest { + expect(1) + val maybe = MaybeSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + maybe.awaitSingleOrNull() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) } @Test fun testMaybeEmitAndAwait() { val maybe = rxMaybe { - Maybe.just("O").await() + "K" + Maybe.just("O").awaitSingleOrNull() + "K" } checkMaybeValue(maybe) { @@ -205,7 +231,7 @@ class MaybeTest : TestBase() { @Test fun testCancelledConsumer() = runTest { expect(1) - val maybe = rxMaybe<Int>(currentDispatcher()) { + val maybe = rxMaybe(currentDispatcher()) { expect(4) try { delay(Long.MAX_VALUE) @@ -228,6 +254,56 @@ class MaybeTest : TestBase() { finish(7) } + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectEmpty() = runTest { + expect(1) + Maybe.empty<Int>().collect { + expectUnreached() + } + finish(2) + } + + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectSingle() = runTest { + expect(1) + Maybe.just("OK").collect { + assertEquals("OK", it) + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the Maybe raises an error. */ + @Test + fun testMaybeCollectThrowingMaybe() = runTest { + expect(1) + try { + Maybe.error<Int>(TestException()).collect { + expectUnreached() + } + } catch (e: TestException) { + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testMaybeCollectThrowingAction() = runTest { + expect(1) + try { + Maybe.just("OK").collect { + expect(2) + throw TestException() + } + } catch (e: TestException) { + expect(3) + } + finish(4) + } + @Test fun testSuppressedException() = runTest { val maybe = rxMaybe(currentDispatcher()) { @@ -241,7 +317,7 @@ class MaybeTest : TestBase() { } } try { - maybe.await() + maybe.awaitSingleOrNull() expectUnreached() } catch (e: TestException) { assertTrue(e.suppressed[0] is TestException2) @@ -301,7 +377,7 @@ class MaybeTest : TestBase() { rxMaybe(Dispatchers.Unconfined) { expect(1) 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt new file mode 100644 index 00000000..680786f9 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import io.reactivex.rxjava3.core.ObservableSource +import io.reactivex.rxjava3.disposables.* +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.test.* + +class ObservableCollectTest: TestBase() { + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testObservableCollectThrowingObservable() = runTest { + expect(1) + var sum = 0 + try { + rxObservable { + for (i in 0..100) { + send(i) + } + throw TestException() + }.collect { + sum += it + } + } catch (e: TestException) { + assertTrue(sum > 0) + finish(2) + } + } + + @Test + fun testObservableCollectThrowingAction() = runTest { + expect(1) + var sum = 0 + val expectedSum = 5 + try { + var disposed = false + ObservableSource<Int> { observer -> + launch(Dispatchers.Default) { + observer.onSubscribe(object : Disposable { + override fun dispose() { + disposed = true + expect(expectedSum + 2) + } + + override fun isDisposed(): Boolean = disposed + }) + while (!disposed) { + observer.onNext(1) + } + } + }.collect { + expect(sum + 2) + sum += it + if (sum == expectedSum) { + throw TestException() + } + } + } catch (e: TestException) { + assertEquals(expectedSum, sum) + finish(expectedSum + 3) + } + } +}
\ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt index 1183b2ae..5ddb36ed 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt @@ -8,6 +8,7 @@ import io.reactivex.rxjava3.exceptions.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class ObservableExceptionHandlingTest : TestBase() { @@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() { } private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable -> - assertTrue(t is UndeliverableException && t.cause is T) + assertTrue(t is UndeliverableException && t.cause is T, "$t") expect(expect) } @@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) { - rxObservable<Int>(Dispatchers.Unconfined) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { + rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw LinkageError() }.subscribe({ @@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test @@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxObservable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is not reported in onError + expect(2) // Fatal exceptions are not treated in a special manner }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { + val latch = CountDownLatch(1) rxObservable(Dispatchers.Unconfined) { expect(1) - send(Unit) + val result = trySend(Unit) + val exception = result.exceptionOrNull() + assertTrue(exception is UndeliverableException) + assertTrue(exception.cause is LinkageError) + assertTrue(isClosedForSend) + expect(4) + latch.countDown() }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Unreached because fatal errors are rethrown + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + latch.await() finish(5) } @@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() { }.subscribe({ expect(2) throw TestException() - }, { expect(3) }) // not reported to onError because came from the subscribe itself + }, { expect(3) }) finish(4) } @@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) @@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) - finish(5) + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + finish(4) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt index 2a3ce046..692f0144 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt @@ -5,7 +5,9 @@ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val observable = rxObservable<String> { + val observable = rxObservable { send(Observable.empty<String>().awaitFirstOrNull() ?: "OK") } @@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() { } } + /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of + * the subscription when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val observable = ObservableSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + observable.awaitFirst() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + + @Test fun testExceptionFromObservable() { val observable = rxObservable { diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt index 431a7a78..01d6a20e 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSourceAsFlowStressTest.kt @@ -27,7 +27,7 @@ class ObservableSourceAsFlowStressTest : TestBase() { val latch = Channel<Unit>(1) var i = 0 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) - .doOnNext { if (++i > 100) latch.offer(Unit) } + .doOnNext { if (++i > 100) latch.trySend(Unit) } val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) latch.receive() job.cancelAndJoin() diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt index 2f043161..58a54616 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt @@ -1,12 +1,14 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx3 import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import org.junit.Test +import kotlin.onSuccess import kotlin.test.* class ObservableSubscriptionSelectTest : TestBase() { @@ -22,23 +24,23 @@ class ObservableSubscriptionSelectTest : TestBase() { val channelB = source.openSubscription() loop@ while (true) { val done: Int = select { - channelA.onReceiveOrNull { - if (it != null) assertEquals(a++, it) - if (it == null) 0 else 1 + channelA.onReceiveCatching { result -> + result.onSuccess { assertEquals(a++, it) } + if (result.isSuccess) 1 else 0 } - channelB.onReceiveOrNull { - if (it != null) assertEquals(b++, it) - if (it == null) 0 else 2 + channelB.onReceiveCatching { result -> + result.onSuccess { assertEquals(b++, it) } + if (result.isSuccess) 2 else 0 } } when (done) { 0 -> break@loop 1 -> { - val r = channelB.receiveOrNull() + val r = channelB.receiveCatching().getOrNull() if (r != null) assertEquals(b++, r) } 2 -> { - val r = channelA.receiveOrNull() + val r = channelA.receiveCatching().getOrNull() if (r != null) assertEquals(a++, r) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt index 46bcaf84..3e763aa5 100644 --- a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.exceptions.* import io.reactivex.rxjava3.functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -98,6 +99,31 @@ class SingleTest : TestBase() { assertEquals("OK", Single.just("O").await() + "K") } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testSingleAwaitCancellation() = runTest { + expect(1) + val single = SingleSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + single.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSingleEmitAndAwait() { val single = rxSingle { @@ -221,7 +247,7 @@ class SingleTest : TestBase() { fun testFatalExceptionInSingle() = runTest { rxSingle(Dispatchers.Unconfined) { throw LinkageError() - }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) + }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) } finish(2) } |