diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2')
22 files changed, 594 insertions, 292 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md index 40fe122f..d93f569a 100644 --- a/reactive/kotlinx-coroutines-rx2/README.md +++ b/reactive/kotlinx-coroutines-rx2/README.md @@ -25,9 +25,8 @@ Suspending extension functions and suspending iteration: | **Name** | **Description** | -------- | --------------- | [CompletableSource.await][io.reactivex.CompletableSource.await] | Awaits for completion of the completable value -| [MaybeSource.await][io.reactivex.MaybeSource.await] | Awaits for the value of the maybe and returns it or null -| [MaybeSource.awaitOrDefault][io.reactivex.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default -| [MaybeSource.openSubscription][io.reactivex.MaybeSource.openSubscription] | Subscribes to maybe and returns [ReceiveChannel] +| [MaybeSource.awaitSingle][io.reactivex.MaybeSource.awaitSingle] | Awaits for the value of the maybe and returns it or throws an exception +| [MaybeSource.awaitSingleOrNull][io.reactivex.MaybeSource.awaitSingleOrNull] | Awaits for the value of the maybe and returns it or null | [SingleSource.await][io.reactivex.SingleSource.await] | Awaits for completion of the single value and returns it | [ObservableSource.awaitFirst][io.reactivex.ObservableSource.awaitFirst] | Awaits for the first value from the given observable | [ObservableSource.awaitFirstOrDefault][io.reactivex.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default @@ -35,7 +34,6 @@ Suspending extension functions and suspending iteration: | [ObservableSource.awaitFirstOrNull][io.reactivex.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null | [ObservableSource.awaitLast][io.reactivex.ObservableSource.awaitFirst] | Awaits for the last value from the given observable | [ObservableSource.awaitSingle][io.reactivex.ObservableSource.awaitSingle] | Awaits for the single value from the given observable -| [ObservableSource.openSubscription][io.reactivex.ObservableSource.openSubscription] | Subscribes to observable and returns [ReceiveChannel] Note that `Flowable` is a subclass of [Reactive Streams](https://www.reactive-streams.org) `Publisher` and extensions for it are covered by @@ -47,7 +45,6 @@ Conversion functions: | -------- | --------------- | [Job.asCompletable][kotlinx.coroutines.Job.asCompletable] | Converts job to hot completable | [Deferred.asSingle][kotlinx.coroutines.Deferred.asSingle] | Converts deferred value to hot single -| [ReceiveChannel.asObservable][kotlinx.coroutines.channels.ReceiveChannel.asObservable] | Converts streaming channel to hot observable | [Scheduler.asCoroutineDispatcher][io.reactivex.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher] <!--- MODULE kotlinx-coroutines-core --> @@ -59,7 +56,6 @@ Conversion functions: <!--- INDEX kotlinx.coroutines.channels --> [ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-producer-scope/index.html -[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html <!--- INDEX kotlinx.coroutines.flow --> @@ -73,24 +69,21 @@ Conversion functions: [rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html [rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-observable.html [rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html -[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html -[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html -[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html -[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html -[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html -[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html -[io.reactivex.MaybeSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/open-subscription.html -[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-single-source/await.html -[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first.html -[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-default.html -[io.reactivex.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-else.html -[io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-null.html -[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-single.html -[io.reactivex.ObservableSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/open-subscription.html -[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-job/as-completable.html -[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-deferred/as-single.html -[kotlinx.coroutines.channels.ReceiveChannel.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.channels.-receive-channel/as-observable.html -[io.reactivex.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-scheduler/as-coroutine-dispatcher.html +[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-flowable.html +[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-observable.html +[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-flow.html +[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await.html +[io.reactivex.MaybeSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-single.html +[io.reactivex.MaybeSource.awaitSingleOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-single-or-null.html +[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await.html +[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first.html +[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first-or-default.html +[io.reactivex.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first-or-else.html +[io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first-or-null.html +[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-single.html +[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-completable.html +[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-single.html +[io.reactivex.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-coroutine-dispatcher.html <!--- END --> diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 4370325f..c27ef4d7 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -8,21 +8,23 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt { public static final fun awaitFirstOrNull (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitLast (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitOrDefault (Lio/reactivex/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingle (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitSingle (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrNull (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/rx2/RxChannelKt { public static final fun collect (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collect (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun consumeEach (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static final fun consumeEach (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun openSubscription (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/channels/ReceiveChannel; + public static final fun toChannel (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel; + public static final fun toChannel (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/channels/ReceiveChannel; } public final class kotlinx/coroutines/rx2/RxCompletableKt { public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable; - public static final fun rxCompletable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable; + public static final synthetic fun rxCompletable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable; public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable; public static synthetic fun rxCompletable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable; } @@ -47,21 +49,21 @@ public final class kotlinx/coroutines/rx2/RxConvertKt { public final class kotlinx/coroutines/rx2/RxFlowableKt { public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable; - public static final fun rxFlowable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable; + public static final synthetic fun rxFlowable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable; public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable; public static synthetic fun rxFlowable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable; } public final class kotlinx/coroutines/rx2/RxMaybeKt { public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe; - public static final fun rxMaybe (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe; + public static final synthetic fun rxMaybe (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe; public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe; public static synthetic fun rxMaybe$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe; } public final class kotlinx/coroutines/rx2/RxObservableKt { public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable; - public static final fun rxObservable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable; + public static final synthetic fun rxObservable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable; public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable; public static synthetic fun rxObservable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable; } @@ -72,7 +74,7 @@ public final class kotlinx/coroutines/rx2/RxSchedulerKt { public final class kotlinx/coroutines/rx2/RxSingleKt { public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single; - public static final fun rxSingle (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single; + public static final synthetic fun rxSingle (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single; public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single; public static synthetic fun rxSingle$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single; } diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle index 73f76c3d..b6fd9327 100644 --- a/reactive/kotlinx-coroutines-rx2/build.gradle +++ b/reactive/kotlinx-coroutines-rx2/build.gradle @@ -1,18 +1,21 @@ /* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +import org.jetbrains.dokka.gradle.DokkaTaskPartial dependencies { - compile project(':kotlinx-coroutines-reactive') - testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output - testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version" - compile "io.reactivex.rxjava2:rxjava:$rxjava2_version" + api project(':kotlinx-coroutines-reactive') + testImplementation project(':kotlinx-coroutines-reactive').sourceSets.test.output + testImplementation "org.reactivestreams:reactive-streams-tck:$reactive_streams_version" + api "io.reactivex.rxjava2:rxjava:$rxjava2_version" } -tasks.withType(dokka.getClass()) { - externalDocumentationLink { - url = new URL('http://reactivex.io/RxJava/2.x/javadoc/') - packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL() +tasks.withType(DokkaTaskPartial.class) { + dokkaSourceSets.configureEach { + externalDocumentationLink { + url.set(new URL('http://reactivex.io/RxJava/2.x/javadoc/')) + packageListUrl.set(projectDir.toPath().resolve("package.list").toUri().toURL()) + } } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index 6e162c9a..0e0b47eb 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -15,11 +15,12 @@ import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ /** - * Awaits for completion of this completable without blocking a thread. - * Returns `Unit` or throws the corresponding exception if this completable had produced error. + * Awaits for completion of this completable without blocking the thread. + * Returns `Unit`, or throws the corresponding exception if this completable produces an error. * * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this - * suspending function is suspended, this function immediately resumes with [CancellationException]. + * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its + * subscription. */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { @@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine // ------------------------ MaybeSource ------------------------ /** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this + * [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + */ +@Suppress("UNCHECKED_CAST") +public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : MaybeObserver<T> { + override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } + override fun onComplete() { cont.resume(null) } + override fun onSuccess(t: T) { cont.resume(t) } + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + * + * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. + */ +public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + +/** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this * maybe had produced error. @@ -39,9 +71,19 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. + * @suppress */ -@Suppress("UNCHECKED_CAST") -public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null) +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. @@ -51,25 +93,29 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). + * @suppress */ -public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver<T> { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(default) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) -} +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ /** - * Awaits for completion of the single value without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this single had produced error. + * Awaits for completion of the single value response without blocking the thread. + * Returns the resulting value, or throws the corresponding exception if this response produces an error. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { @@ -82,69 +128,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine // ------------------------ ObservableSource ------------------------ /** - * Awaits for the first value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or, + * if the observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the + * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, + * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws + * the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() +public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = + awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the last value from the given [Observable] without blocking the thread and + * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, + * if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value - * @throws IllegalArgumentException if observable emits more than one value + * @throws NoSuchElementException if the observable does not emit any value + * @throws IllegalArgumentException if the observable emits more than one value */ public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt index 0fe43f1c..3e39033e 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt @@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo try { RxJavaPlugins.onError(cause) } catch (e: Throwable) { + cause.addSuppressed(e) handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index a129196a..bb093b07 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -9,6 +9,8 @@ import io.reactivex.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. @@ -16,8 +18,9 @@ import kotlinx.coroutines.internal.* * * This API is deprecated in the favour of [Flow]. * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. + * @suppress */ -@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4 +@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> { val channel = SubscriptionChannel<T>() subscribe(channel) @@ -30,37 +33,46 @@ public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> { * * This API is deprecated in the favour of [Flow]. * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. + * @suppress */ -@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4 +@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> { val channel = SubscriptionChannel<T>() subscribe(channel) return channel } -// Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) -public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) - -// Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) -public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) - /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from + * [collect]. */ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) + toChannel().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) + toChannel().consumeEach(action) + +@PublishedApi +internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> { + val channel = SubscriptionChannel<T>() + subscribe(channel) + return channel +} + +@PublishedApi +internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> { + val channel = SubscriptionChannel<T>() + subscribe(channel) + return channel +} @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel<T> : @@ -79,11 +91,12 @@ private class SubscriptionChannel<T> : } override fun onSuccess(t: T) { - offer(t) + trySend(t) + close(cause = null) } override fun onNext(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index d0a43fb1..3f915382 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -2,14 +2,11 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [Completable] that runs a given [block] in a coroutine and emits its result. @@ -28,17 +25,6 @@ public fun rxCompletable( return rxCompletableInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxCompletable(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun CoroutineScope.rxCompletable( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> Unit -): Completable = rxCompletableInternal(this, context, block) - private fun rxCompletableInternal( scope: CoroutineScope, // support for legacy rxCompletable in scope context: CoroutineContext, @@ -53,7 +39,7 @@ private fun rxCompletableInternal( private class RxCompletableCoroutine( parentContext: CoroutineContext, private val subscriber: CompletableEmitter -) : AbstractCoroutine<Unit>(parentContext, true) { +) : AbstractCoroutine<Unit>(parentContext, false, true) { override fun onCompleted(value: Unit) { try { subscriber.onComplete() @@ -64,11 +50,25 @@ private class RxCompletableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } + +/** + * @suppress + */ +@Deprecated( + message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxCompletable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun CoroutineScope.rxCompletable( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> Unit +): Completable = rxCompletableInternal(this, context, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 14c24942..497c922c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -26,7 +26,6 @@ import kotlin.coroutines.* * * @param context -- the coroutine context from which the resulting completable is going to be signalled */ -@ExperimentalCoroutinesApi public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) { this@asCompletable.join() } @@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) { this@asMaybe.await() } @@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay * * @param context -- the coroutine context from which the resulting single is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) { this@asSingle.await() } @@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ -@ExperimentalCoroutinesApi public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { val disposableRef = AtomicReference<Disposable>() val observer = object : Observer<T> { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { + /* + * Channel was closed by the downstream, so the exception (if any) + * also was handled by the same downstream + */ try { - sendBlocking(t) - } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 - // Is handled by the downstream flow + trySendBlocking(t) + } catch (e: InterruptedException) { + // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } @@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if @@ -137,7 +136,6 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = Flowable.fromPublisher(asPublisher(context)) @@ -151,6 +149,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): send(t) } +/** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") @@ -158,6 +157,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = asFlowable(context) +/** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt index 8dfe9576..c856bb4e 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt @@ -31,7 +31,6 @@ import kotlin.internal.* * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect */ -@ExperimentalCoroutinesApi public fun <T: Any> rxFlowable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -41,9 +40,10 @@ public fun <T: Any> rxFlowable( return Flowable.fromPublisher(publishInternal(GlobalScope, context, RX_HANDLER, block)) } +/** @suppress */ @Deprecated( message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable", - level = DeprecationLevel.ERROR, + level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("rxFlowable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index f5ed48b9..ab713123 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -2,14 +2,11 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result. @@ -29,17 +26,6 @@ public fun <T> rxMaybe( return rxMaybeInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxMaybe(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T> CoroutineScope.rxMaybe( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T? -): Maybe<T> = rxMaybeInternal(this, context, block) - private fun <T> rxMaybeInternal( scope: CoroutineScope, // support for legacy rxMaybe in scope context: CoroutineContext, @@ -54,7 +40,7 @@ private fun <T> rxMaybeInternal( private class RxMaybeCoroutine<T>( parentContext: CoroutineContext, private val subscriber: MaybeEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) @@ -65,11 +51,23 @@ private class RxMaybeCoroutine<T>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } + +/** @suppress */ +@Deprecated( + message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxMaybe(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T> CoroutineScope.rxMaybe( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T? +): Maybe<T> = rxMaybeInternal(this, context, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 6d11cb9c..5f409815 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -2,8 +2,6 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* @@ -11,10 +9,10 @@ import io.reactivex.exceptions.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -32,7 +30,6 @@ import kotlin.internal.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T : Any> rxObservable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -42,17 +39,6 @@ public fun <T : Any> rxObservable( return rxObservableInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxObservable(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T : Any> CoroutineScope.rxObservable( - context: CoroutineContext = EmptyCoroutineContext, - @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Observable<T> = rxObservableInternal(this, context, block) - private fun <T : Any> rxObservableInternal( scope: CoroutineScope, // support for legacy rxObservable in scope context: CoroutineContext, @@ -68,39 +54,35 @@ private const val OPEN = 0 // open channel, still working private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError -private class RxObservableCoroutine<T: Any>( +private class RxObservableCoroutine<T : Any>( parentContext: CoroutineContext, private val subscriber: ObservableEmitter<T> -) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { +) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { override val channel: SendChannel<T> get() = this - // Mutex is locked when while subscriber.onXXX is being invoked + // Mutex is locked while subscriber.onXXX is being invoked private val mutex = Mutex() private val _signal = atomic(OPEN) - override val isClosedForSend: Boolean get() = isCompleted - override val isFull: Boolean = mutex.isLocked + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false - doLockedNext(element) - return true - } + override fun trySend(element: T): ChannelResult<Unit> = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (offer(element)) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2<T, SendChannel<T>> @@ -108,30 +90,39 @@ private class RxObservableCoroutine<T: Any>( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) { + override fun <R> registerSelectClause2( + select: SelectInstance<R>, + element: T, + block: suspend (SendChannel<T>) -> R + ) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { + private fun doLockedNext(elem: T): Throwable? { // check if already closed for send if (!isActive) { doLockedSignalCompleted(completionCause, completionCauseHandled) - throw getCancellationException() + return getCancellationException() } // notify subscriber try { subscriber.onNext(elem) } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) - mutex.unlock() - throw e + val cause = UndeliverableException(e) + val causeDelivered = close(cause) + unlockAndCheckCompleted() + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + handleUndeliverableException(cause, context) + getCancellationException() + } } /* * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might @@ -140,6 +131,7 @@ private class RxObservableCoroutine<T: Any>( * We have to recheck `isCompleted` after `unlock` anyway. */ unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -153,33 +145,31 @@ private class RxObservableCoroutine<T: Any>( private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { // cancellation failures try { - if (_signal.value >= CLOSED) { - _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (_signal.value == SIGNALLED) + return + _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = cause?.let { unwrap(it) } + if (unwrappedCause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { - handleUndeliverableException(cause, context) - } - } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - // Unhandled exception (cannot handle in other way, since we are already complete) + subscriber.onComplete() + } catch (e: Exception) { handleUndeliverableException(e, context) } + } else if (unwrappedCause is UndeliverableException && !handled) { + /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, + * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already + * cancelled. */ + handleUndeliverableException(cause, context) + } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { + try { + /** If the subscriber is already in a terminal state, the error will be signalled to + * `RxJavaPlugins.onError`. */ + subscriber.onError(cause) + } catch (e: Exception) { + cause.addSuppressed(e) + handleUndeliverableException(cause, context) + } } } finally { mutex.unlock() @@ -201,9 +191,13 @@ private class RxObservableCoroutine<T: Any>( } } -internal fun Throwable.isFatal() = try { - Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode - false -} catch (e: Throwable) { - true -} +/** @suppress */ +@Deprecated( + message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxObservable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T : Any> CoroutineScope.rxObservable( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope<T>.() -> Unit +): Observable<T> = rxObservableInternal(this, context, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index b8012b6d..27842a21 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -2,14 +2,11 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result. @@ -28,17 +25,6 @@ public fun <T : Any> rxSingle( return rxSingleInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxSingle(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T : Any> CoroutineScope.rxSingle( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T -): Single<T> = rxSingleInternal(this, context, block) - private fun <T : Any> rxSingleInternal( scope: CoroutineScope, // support for legacy rxSingle in scope context: CoroutineContext, @@ -53,7 +39,7 @@ private fun <T : Any> rxSingleInternal( private class RxSingleCoroutine<T: Any>( parentContext: CoroutineContext, private val subscriber: SingleEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { subscriber.onSuccess(value) @@ -64,11 +50,23 @@ private class RxSingleCoroutine<T: Any>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } + +/** @suppress */ +@Deprecated( + message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxSingle(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T : Any> CoroutineScope.rxSingle( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T +): Single<T> = rxSingleInternal(this, context, block) diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index 298b32bf..16f0005b 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -98,6 +98,31 @@ class CompletableTest : TestBase() { } } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is + * cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val completable = CompletableSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + completable.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSuppressedException() = runTest { val completable = rxCompletable(currentDispatcher()) { @@ -119,7 +144,7 @@ class CompletableTest : TestBase() { } @Test - fun testUnhandledException() = runTest() { + fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null val handler = { e: Throwable -> @@ -165,8 +190,7 @@ class CompletableTest : TestBase() { withExceptionHandler(handler) { rxCompletable(Dispatchers.Unconfined) { expect(1) - 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt index 05b7ee92..31643929 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt @@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { rxFlowable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() }.subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is reported to both onError and CEH + expect(2) // Fatal exceptions are not treated as special }) - finish(4) + finish(3) } @Test @@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxFlowable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { rxFlowable(Dispatchers.Unconfined) { expect(1) send(Unit) }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Fatal exception is reported to both onError and CEH - finish(5) + }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled + finish(4) } @Test diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt index 540fa76b..8a6362ad 100644 --- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* @@ -92,7 +93,7 @@ class IntegrationTest( assertEquals(n, observable.awaitLast()) assertFailsWith<IllegalArgumentException> { observable.awaitSingle() } checkNumbers(n, observable) - val channel = observable.openSubscription() + val channel = observable.toChannel() checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext))) channel.cancel() } @@ -124,6 +125,21 @@ class IntegrationTest( finish(3) } + @Test + fun testObservableWithTimeout() = runTest { + val observable = rxObservable<Int> { + expect(2) + withTimeout(1) { delay(100) } + } + try { + expect(1) + observable.awaitFirstOrNull() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + private suspend fun checkNumbers(n: Int, observable: Observable<Int>) { var last = 0 observable.collect { diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 08427dcf..f5d128d3 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -7,13 +7,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* import io.reactivex.exceptions.* -import io.reactivex.functions.* import io.reactivex.internal.functions.Functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.test.* class MaybeTest : TestBase() { @@ -47,7 +46,7 @@ class MaybeTest : TestBase() { null } expect(2) - maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action { + maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, { expect(5) }) expect(3) @@ -112,18 +111,45 @@ class MaybeTest : TestBase() { @Test fun testMaybeAwait() = runBlocking { - assertEquals("OK", Maybe.just("O").await() + "K") + assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K") + assertEquals("OK", Maybe.just("O").awaitSingle() + "K") } @Test - fun testMaybeAwaitForNull() = runBlocking { - assertNull(Maybe.empty<String>().await()) + fun testMaybeAwaitForNull(): Unit = runBlocking { + assertNull(Maybe.empty<String>().awaitSingleOrNull()) + assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() } + } + + /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testMaybeAwaitCancellation() = runTest { + expect(1) + val maybe = MaybeSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + maybe.awaitSingleOrNull() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) } @Test fun testMaybeEmitAndAwait() { val maybe = rxMaybe { - Maybe.just("O").await() + "K" + Maybe.just("O").awaitSingleOrNull() + "K" } checkMaybeValue(maybe) { @@ -205,7 +231,7 @@ class MaybeTest : TestBase() { @Test fun testCancelledConsumer() = runTest { expect(1) - val maybe = rxMaybe<Int>(currentDispatcher()) { + val maybe = rxMaybe(currentDispatcher()) { expect(4) try { delay(Long.MAX_VALUE) @@ -228,6 +254,56 @@ class MaybeTest : TestBase() { finish(7) } + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectEmpty() = runTest { + expect(1) + Maybe.empty<Int>().collect { + expectUnreached() + } + finish(2) + } + + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectSingle() = runTest { + expect(1) + Maybe.just("OK").collect { + assertEquals("OK", it) + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the Maybe raises an error. */ + @Test + fun testMaybeCollectThrowingMaybe() = runTest { + expect(1) + try { + Maybe.error<Int>(TestException()).collect { + expectUnreached() + } + } catch (e: TestException) { + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testMaybeCollectThrowingAction() = runTest { + expect(1) + try { + Maybe.just("OK").collect { + expect(2) + throw TestException() + } + } catch (e: TestException) { + expect(3) + } + finish(4) + } + @Test fun testSuppressedException() = runTest { val maybe = rxMaybe(currentDispatcher()) { @@ -241,7 +317,7 @@ class MaybeTest : TestBase() { } } try { - maybe.await() + maybe.awaitSingleOrNull() expectUnreached() } catch (e: TestException) { assertTrue(e.suppressed[0] is TestException2) @@ -301,7 +377,7 @@ class MaybeTest : TestBase() { rxMaybe(Dispatchers.Unconfined) { expect(1) 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt new file mode 100644 index 00000000..508f594a --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt @@ -0,0 +1,69 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import io.reactivex.disposables.* +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.test.* + +class ObservableCollectTest: TestBase() { + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testObservableCollectThrowingObservable() = runTest { + expect(1) + var sum = 0 + try { + rxObservable { + for (i in 0..100) { + send(i) + } + throw TestException() + }.collect { + sum += it + } + } catch (e: TestException) { + assertTrue(sum > 0) + finish(2) + } + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testObservableCollectThrowingAction() = runTest { + expect(1) + var sum = 0 + val expectedSum = 5 + try { + var disposed = false + ObservableSource<Int> { observer -> + launch(Dispatchers.Default) { + observer.onSubscribe(object : Disposable { + override fun dispose() { + disposed = true + expect(expectedSum + 2) + } + + override fun isDisposed(): Boolean = disposed + }) + while (!disposed) { + observer.onNext(1) + } + } + }.collect { + expect(sum + 2) + sum += it + if (sum == expectedSum) { + throw TestException() + } + } + } catch (e: TestException) { + assertEquals(expectedSum, sum) + finish(expectedSum + 3) + } + } +}
\ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index d6cdd3ca..fb3d0f69 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -8,6 +8,7 @@ import io.reactivex.exceptions.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test +import java.util.concurrent.* import kotlin.test.* class ObservableExceptionHandlingTest : TestBase() { @@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() { } private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable -> - assertTrue(t is UndeliverableException && t.cause is T) + assertTrue(t is UndeliverableException && t.cause is T, "$t") expect(expect) } @@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) { - rxObservable<Int>(Dispatchers.Unconfined) { + fun testFatalException() = withExceptionHandler({ expectUnreached() }) { + rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) { expect(1) throw LinkageError() }.subscribe({ @@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() { }, { expect(2) }) - finish(4) + finish(3) } @Test @@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) { + fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) { rxObservable<Int>(Dispatchers.Unconfined) { expect(1) throw LinkageError() @@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expectUnreached() }, { - expect(2) // Fatal exception is not reported in onError + expect(2) // Fatal exceptions are not treated in a special manner }) - finish(4) + finish(3) } @Test - fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { + val latch = CountDownLatch(1) rxObservable(Dispatchers.Unconfined) { expect(1) - send(Unit) + val result = trySend(Unit) + val exception = result.exceptionOrNull() + assertTrue(exception is UndeliverableException) + assertTrue(exception.cause is LinkageError) + assertTrue(isClosedForSend) + expect(4) + latch.countDown() }.subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) // Unreached because fatal errors are rethrown + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + latch.await() finish(5) } @@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() { }.subscribe({ expect(2) throw TestException() - }, { expect(3) }) // not reported to onError because came from the subscribe itself + }, { expect(3) }) finish(4) } @@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() { } @Test - fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) { + fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) { rxObservable(Dispatchers.Unconfined) { expect(1) send(Unit) @@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() { .subscribe({ expect(2) throw LinkageError() - }, { expect(3) }) - finish(5) + }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw. + finish(4) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt index 4454190f..e246407a 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt @@ -5,7 +5,9 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.disposables.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val observable = rxObservable<String> { + val observable = rxObservable { send(Observable.empty<String>().awaitFirstOrNull() ?: "OK") } @@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() { } } + /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of + * the subscription when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val observable = ObservableSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + observable.awaitFirst() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + + @Test fun testExceptionFromObservable() { val observable = rxObservable { diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt index 159f3729..0253fced 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt @@ -26,7 +26,7 @@ class ObservableSourceAsFlowStressTest : TestBase() { val latch = Channel<Unit>(1) var i = 0 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) - .doOnNext { if (++i > 100) latch.offer(Unit) } + .doOnNext { if (++i > 100) latch.trySend(Unit) } val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) latch.receive() job.cancelAndJoin() diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt index 3cd3bbff..2c22cbf0 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt @@ -1,12 +1,14 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx2 import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import org.junit.Test +import kotlin.onSuccess import kotlin.test.* class ObservableSubscriptionSelectTest : TestBase() { @@ -18,27 +20,27 @@ class ObservableSubscriptionSelectTest : TestBase() { var a = 0 var b = 0 // open two subs - val channelA = source.openSubscription() - val channelB = source.openSubscription() + val channelA = source.toChannel() + val channelB = source.toChannel() loop@ while (true) { val done: Int = select { - channelA.onReceiveOrNull { - if (it != null) assertEquals(a++, it) - if (it == null) 0 else 1 + channelA.onReceiveCatching { result -> + result.onSuccess { assertEquals(a++, it) } + if (result.isSuccess) 1 else 0 } - channelB.onReceiveOrNull { - if (it != null) assertEquals(b++, it) - if (it == null) 0 else 2 + channelB.onReceiveCatching { result -> + result.onSuccess { assertEquals(b++, it) } + if (result.isSuccess) 2 else 0 } } when (done) { 0 -> break@loop 1 -> { - val r = channelB.receiveOrNull() + val r = channelB.receiveCatching().getOrNull() if (r != null) assertEquals(b++, r) } 2 -> { - val r = channelA.receiveOrNull() + val r = channelA.receiveCatching().getOrNull() if (r != null) assertEquals(a++, r) } } @@ -48,4 +50,4 @@ class ObservableSubscriptionSelectTest : TestBase() { // should receive one of them fully assertTrue(a == n || b == n) } -}
\ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index c66188a1..b359d963 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -9,6 +9,7 @@ import io.reactivex.disposables.* import io.reactivex.exceptions.* import io.reactivex.functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -98,6 +99,31 @@ class SingleTest : TestBase() { assertEquals("OK", Single.just("O").await() + "K") } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testSingleAwaitCancellation() = runTest { + expect(1) + val single = SingleSource<Int> { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + single.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSingleEmitAndAwait() { val single = rxSingle { @@ -221,7 +247,7 @@ class SingleTest : TestBase() { fun testFatalExceptionInSingle() = runTest { rxSingle(Dispatchers.Unconfined) { throw LinkageError() - }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) + }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) } finish(2) } |