aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx2
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2')
-rw-r--r--reactive/kotlinx-coroutines-rx2/README.md41
-rw-r--r--reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api16
-rw-r--r--reactive/kotlinx-coroutines-rx2/build.gradle19
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxAwait.kt142
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt1
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxChannel.kt49
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt36
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxConvert.kt18
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt4
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt34
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxObservable.kt140
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxSingle.kt34
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt30
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt16
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt18
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt96
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt69
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt37
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt30
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt2
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt26
-rw-r--r--reactive/kotlinx-coroutines-rx2/test/SingleTest.kt28
22 files changed, 594 insertions, 292 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/README.md b/reactive/kotlinx-coroutines-rx2/README.md
index 40fe122f..d93f569a 100644
--- a/reactive/kotlinx-coroutines-rx2/README.md
+++ b/reactive/kotlinx-coroutines-rx2/README.md
@@ -25,9 +25,8 @@ Suspending extension functions and suspending iteration:
| **Name** | **Description**
| -------- | ---------------
| [CompletableSource.await][io.reactivex.CompletableSource.await] | Awaits for completion of the completable value
-| [MaybeSource.await][io.reactivex.MaybeSource.await] | Awaits for the value of the maybe and returns it or null
-| [MaybeSource.awaitOrDefault][io.reactivex.MaybeSource.awaitOrDefault] | Awaits for the value of the maybe and returns it or default
-| [MaybeSource.openSubscription][io.reactivex.MaybeSource.openSubscription] | Subscribes to maybe and returns [ReceiveChannel]
+| [MaybeSource.awaitSingle][io.reactivex.MaybeSource.awaitSingle] | Awaits for the value of the maybe and returns it or throws an exception
+| [MaybeSource.awaitSingleOrNull][io.reactivex.MaybeSource.awaitSingleOrNull] | Awaits for the value of the maybe and returns it or null
| [SingleSource.await][io.reactivex.SingleSource.await] | Awaits for completion of the single value and returns it
| [ObservableSource.awaitFirst][io.reactivex.ObservableSource.awaitFirst] | Awaits for the first value from the given observable
| [ObservableSource.awaitFirstOrDefault][io.reactivex.ObservableSource.awaitFirstOrDefault] | Awaits for the first value from the given observable or default
@@ -35,7 +34,6 @@ Suspending extension functions and suspending iteration:
| [ObservableSource.awaitFirstOrNull][io.reactivex.ObservableSource.awaitFirstOrNull] | Awaits for the first value from the given observable or null
| [ObservableSource.awaitLast][io.reactivex.ObservableSource.awaitFirst] | Awaits for the last value from the given observable
| [ObservableSource.awaitSingle][io.reactivex.ObservableSource.awaitSingle] | Awaits for the single value from the given observable
-| [ObservableSource.openSubscription][io.reactivex.ObservableSource.openSubscription] | Subscribes to observable and returns [ReceiveChannel]
Note that `Flowable` is a subclass of [Reactive Streams](https://www.reactive-streams.org)
`Publisher` and extensions for it are covered by
@@ -47,7 +45,6 @@ Conversion functions:
| -------- | ---------------
| [Job.asCompletable][kotlinx.coroutines.Job.asCompletable] | Converts job to hot completable
| [Deferred.asSingle][kotlinx.coroutines.Deferred.asSingle] | Converts deferred value to hot single
-| [ReceiveChannel.asObservable][kotlinx.coroutines.channels.ReceiveChannel.asObservable] | Converts streaming channel to hot observable
| [Scheduler.asCoroutineDispatcher][io.reactivex.Scheduler.asCoroutineDispatcher] | Converts scheduler to [CoroutineDispatcher]
<!--- MODULE kotlinx-coroutines-core -->
@@ -59,7 +56,6 @@ Conversion functions:
<!--- INDEX kotlinx.coroutines.channels -->
[ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-producer-scope/index.html
-[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
<!--- INDEX kotlinx.coroutines.flow -->
@@ -73,24 +69,21 @@ Conversion functions:
[rxSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-single.html
[rxObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-observable.html
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-flowable.html
-[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-flowable.html
-[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.flow.-flow/as-observable.html
-[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/as-flow.html
-[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-completable-source/await.html
-[io.reactivex.MaybeSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await.html
-[io.reactivex.MaybeSource.awaitOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/await-or-default.html
-[io.reactivex.MaybeSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-maybe-source/open-subscription.html
-[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-single-source/await.html
-[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first.html
-[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-default.html
-[io.reactivex.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-else.html
-[io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-first-or-null.html
-[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/await-single.html
-[io.reactivex.ObservableSource.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-observable-source/open-subscription.html
-[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-job/as-completable.html
-[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-deferred/as-single.html
-[kotlinx.coroutines.channels.ReceiveChannel.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.channels.-receive-channel/as-observable.html
-[io.reactivex.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/io.reactivex.-scheduler/as-coroutine-dispatcher.html
+[Flow.asFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-flowable.html
+[Flow.asObservable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-observable.html
+[ObservableSource.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-flow.html
+[io.reactivex.CompletableSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await.html
+[io.reactivex.MaybeSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-single.html
+[io.reactivex.MaybeSource.awaitSingleOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-single-or-null.html
+[io.reactivex.SingleSource.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await.html
+[io.reactivex.ObservableSource.awaitFirst]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first.html
+[io.reactivex.ObservableSource.awaitFirstOrDefault]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first-or-default.html
+[io.reactivex.ObservableSource.awaitFirstOrElse]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first-or-else.html
+[io.reactivex.ObservableSource.awaitFirstOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-first-or-null.html
+[io.reactivex.ObservableSource.awaitSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/await-single.html
+[kotlinx.coroutines.Job.asCompletable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-completable.html
+[kotlinx.coroutines.Deferred.asSingle]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-single.html
+[io.reactivex.Scheduler.asCoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/as-coroutine-dispatcher.html
<!--- END -->
diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
index 4370325f..c27ef4d7 100644
--- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
+++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api
@@ -8,21 +8,23 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt {
public static final fun awaitFirstOrNull (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitLast (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitOrDefault (Lio/reactivex/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitSingle (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitSingle (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun awaitSingleOrNull (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}
public final class kotlinx/coroutines/rx2/RxChannelKt {
public static final fun collect (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public static final fun consumeEach (Lio/reactivex/MaybeSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
- public static final fun consumeEach (Lio/reactivex/ObservableSource;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun openSubscription (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun openSubscription (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
+ public static final fun toChannel (Lio/reactivex/MaybeSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
+ public static final fun toChannel (Lio/reactivex/ObservableSource;)Lkotlinx/coroutines/channels/ReceiveChannel;
}
public final class kotlinx/coroutines/rx2/RxCompletableKt {
public static final fun rxCompletable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable;
- public static final fun rxCompletable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable;
+ public static final synthetic fun rxCompletable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Completable;
public static synthetic fun rxCompletable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable;
public static synthetic fun rxCompletable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Completable;
}
@@ -47,21 +49,21 @@ public final class kotlinx/coroutines/rx2/RxConvertKt {
public final class kotlinx/coroutines/rx2/RxFlowableKt {
public static final fun rxFlowable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable;
- public static final fun rxFlowable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable;
+ public static final synthetic fun rxFlowable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Flowable;
public static synthetic fun rxFlowable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static synthetic fun rxFlowable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Flowable;
}
public final class kotlinx/coroutines/rx2/RxMaybeKt {
public static final fun rxMaybe (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe;
- public static final fun rxMaybe (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe;
+ public static final synthetic fun rxMaybe (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Maybe;
public static synthetic fun rxMaybe$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe;
public static synthetic fun rxMaybe$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Maybe;
}
public final class kotlinx/coroutines/rx2/RxObservableKt {
public static final fun rxObservable (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable;
- public static final fun rxObservable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable;
+ public static final synthetic fun rxObservable (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Observable;
public static synthetic fun rxObservable$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable;
public static synthetic fun rxObservable$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Observable;
}
@@ -72,7 +74,7 @@ public final class kotlinx/coroutines/rx2/RxSchedulerKt {
public final class kotlinx/coroutines/rx2/RxSingleKt {
public static final fun rxSingle (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single;
- public static final fun rxSingle (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single;
+ public static final synthetic fun rxSingle (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lio/reactivex/Single;
public static synthetic fun rxSingle$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single;
public static synthetic fun rxSingle$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lio/reactivex/Single;
}
diff --git a/reactive/kotlinx-coroutines-rx2/build.gradle b/reactive/kotlinx-coroutines-rx2/build.gradle
index 73f76c3d..b6fd9327 100644
--- a/reactive/kotlinx-coroutines-rx2/build.gradle
+++ b/reactive/kotlinx-coroutines-rx2/build.gradle
@@ -1,18 +1,21 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+import org.jetbrains.dokka.gradle.DokkaTaskPartial
dependencies {
- compile project(':kotlinx-coroutines-reactive')
- testCompile project(':kotlinx-coroutines-reactive').sourceSets.test.output
- testCompile "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
- compile "io.reactivex.rxjava2:rxjava:$rxjava2_version"
+ api project(':kotlinx-coroutines-reactive')
+ testImplementation project(':kotlinx-coroutines-reactive').sourceSets.test.output
+ testImplementation "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
+ api "io.reactivex.rxjava2:rxjava:$rxjava2_version"
}
-tasks.withType(dokka.getClass()) {
- externalDocumentationLink {
- url = new URL('http://reactivex.io/RxJava/2.x/javadoc/')
- packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
+tasks.withType(DokkaTaskPartial.class) {
+ dokkaSourceSets.configureEach {
+ externalDocumentationLink {
+ url.set(new URL('http://reactivex.io/RxJava/2.x/javadoc/'))
+ packageListUrl.set(projectDir.toPath().resolve("package.list").toUri().toURL())
+ }
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
index 6e162c9a..0e0b47eb 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt
@@ -15,11 +15,12 @@ import kotlin.coroutines.*
// ------------------------ CompletableSource ------------------------
/**
- * Awaits for completion of this completable without blocking a thread.
- * Returns `Unit` or throws the corresponding exception if this completable had produced error.
+ * Awaits for completion of this completable without blocking the thread.
+ * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
*
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
- * suspending function is suspended, this function immediately resumes with [CancellationException].
+ * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
+ * subscription.
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
@@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
// ------------------------ MaybeSource ------------------------
/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
+ * [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
+ subscribe(object : MaybeObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(null) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ *
+ * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
+ */
+public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
+
+/**
* Awaits for completion of the maybe without blocking a thread.
* Returns the resulting value, null if no value was produced or throws the corresponding exception if this
* maybe had produced error.
@@ -39,9 +71,19 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
+ * a value, as opposed to throwing in such case.
+ * @suppress
*/
-@Suppress("UNCHECKED_CAST")
-public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull()")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
/**
* Awaits for completion of the maybe without blocking a thread.
@@ -51,25 +93,29 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
+ * details).
+ * @suppress
*/
-public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
- subscribe(object : MaybeObserver<T> {
- override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
- override fun onComplete() { cont.resume(default) }
- override fun onSuccess(t: T) { cont.resume(t) }
- override fun onError(error: Throwable) { cont.resumeWithException(error) }
- })
-}
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
// ------------------------ SingleSource ------------------------
/**
- * Awaits for completion of the single value without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this single had produced error.
+ * Awaits for completion of the single value response without blocking the thread.
+ * Returns the resulting value, or throws the corresponding exception if this response produces an error.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
@@ -82,69 +128,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
// ------------------------ ObservableSource ------------------------
/**
- * Awaits for the first value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
+ * if the observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
/**
- * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
+ * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
+ * corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
/**
- * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
+ * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
+ * exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
/**
- * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
+ * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
+ * the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
-public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
+public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
+ awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
/**
- * Awaits for the last value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the last value from the given [Observable] without blocking the thread and
+ * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
/**
- * Awaits for the single value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
+ * if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
- * @throws IllegalArgumentException if observable emits more than one value
+ * @throws NoSuchElementException if the observable does not emit any value
+ * @throws IllegalArgumentException if the observable emits more than one value
*/
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
index 0fe43f1c..3e39033e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt
@@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo
try {
RxJavaPlugins.onError(cause)
} catch (e: Throwable) {
+ cause.addSuppressed(e)
handleCoroutineException(context, cause)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
index a129196a..bb093b07 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt
@@ -9,6 +9,8 @@ import io.reactivex.disposables.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
/**
* Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
@@ -16,8 +18,9 @@ import kotlinx.coroutines.internal.*
*
* This API is deprecated in the favour of [Flow].
* [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
+ * @suppress
*/
-@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4
+@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5
public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
@@ -30,37 +33,46 @@ public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> {
*
* This API is deprecated in the favour of [Flow].
* [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first.
+ * @suppress
*/
-@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4
+@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5
public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
val channel = SubscriptionChannel<T>()
subscribe(channel)
return channel
}
-// Will be promoted to error in 1.3.0, removed in 1.4.0
-@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
-public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit): Unit =
- openSubscription().consumeEach(action)
-
-// Will be promoted to error in 1.3.0, removed in 1.4.0
-@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)"))
-public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit): Unit =
- openSubscription().consumeEach(action)
-
/**
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
- * Cancels subscription if any exception happens during collect.
+ *
+ * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
+ * [collect].
*/
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
- openSubscription().consumeEach(action)
+ toChannel().consumeEach(action)
/**
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
- * Cancels subscription if any exception happens during collect.
+ *
+ * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
+ * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
- openSubscription().consumeEach(action)
+ toChannel().consumeEach(action)
+
+@PublishedApi
+internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> {
+ val channel = SubscriptionChannel<T>()
+ subscribe(channel)
+ return channel
+}
+
+@PublishedApi
+internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
+ val channel = SubscriptionChannel<T>()
+ subscribe(channel)
+ return channel
+}
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
private class SubscriptionChannel<T> :
@@ -79,11 +91,12 @@ private class SubscriptionChannel<T> :
}
override fun onSuccess(t: T) {
- offer(t)
+ trySend(t)
+ close(cause = null)
}
override fun onNext(t: T) {
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onComplete() {
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
index d0a43fb1..3f915382 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt
@@ -2,14 +2,11 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
-
package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
-import kotlin.internal.*
/**
* Creates cold [Completable] that runs a given [block] in a coroutine and emits its result.
@@ -28,17 +25,6 @@ public fun rxCompletable(
return rxCompletableInternal(GlobalScope, context, block)
}
-@Deprecated(
- message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("rxCompletable(context, block)")
-) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
-@LowPriorityInOverloadResolution
-public fun CoroutineScope.rxCompletable(
- context: CoroutineContext = EmptyCoroutineContext,
- block: suspend CoroutineScope.() -> Unit
-): Completable = rxCompletableInternal(this, context, block)
-
private fun rxCompletableInternal(
scope: CoroutineScope, // support for legacy rxCompletable in scope
context: CoroutineContext,
@@ -53,7 +39,7 @@ private fun rxCompletableInternal(
private class RxCompletableCoroutine(
parentContext: CoroutineContext,
private val subscriber: CompletableEmitter
-) : AbstractCoroutine<Unit>(parentContext, true) {
+) : AbstractCoroutine<Unit>(parentContext, false, true) {
override fun onCompleted(value: Unit) {
try {
subscriber.onComplete()
@@ -64,11 +50,25 @@ private class RxCompletableCoroutine(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
+
+/**
+ * @suppress
+ */
+@Deprecated(
+ message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable",
+ level = DeprecationLevel.HIDDEN,
+ replaceWith = ReplaceWith("rxCompletable(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+public fun CoroutineScope.rxCompletable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> Unit
+): Completable = rxCompletableInternal(this, context, block)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 14c24942..497c922c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -26,7 +26,6 @@ import kotlin.coroutines.*
*
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
this@asCompletable.join()
}
@@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
this@asMaybe.await()
}
@@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay
*
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
this@asSingle.await()
}
@@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T>
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) {
+ /*
+ * Channel was closed by the downstream, so the exception (if any)
+ * also was handled by the same downstream
+ */
try {
- sendBlocking(t)
- } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
- // Is handled by the downstream flow
+ trySendBlocking(t)
+ } catch (e: InterruptedException) {
+ // RxJava interrupts the source
}
}
override fun onError(e: Throwable) { close(e) }
@@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
@@ -137,7 +136,6 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
@@ -151,6 +149,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
send(t)
}
+/** @suppress **/
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@@ -158,6 +157,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)
+/** @suppress **/
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
index 8dfe9576..c856bb4e 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt
@@ -31,7 +31,6 @@ import kotlin.internal.*
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> rxFlowable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
@@ -41,9 +40,10 @@ public fun <T: Any> rxFlowable(
return Flowable.fromPublisher(publishInternal(GlobalScope, context, RX_HANDLER, block))
}
+/** @suppress */
@Deprecated(
message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable",
- level = DeprecationLevel.ERROR,
+ level = DeprecationLevel.HIDDEN,
replaceWith = ReplaceWith("rxFlowable(context, block)")
) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
@LowPriorityInOverloadResolution
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
index f5ed48b9..ab713123 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt
@@ -2,14 +2,11 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
-
package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
-import kotlin.internal.*
/**
* Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result.
@@ -29,17 +26,6 @@ public fun <T> rxMaybe(
return rxMaybeInternal(GlobalScope, context, block)
}
-@Deprecated(
- message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("rxMaybe(context, block)")
-) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
-@LowPriorityInOverloadResolution
-public fun <T> CoroutineScope.rxMaybe(
- context: CoroutineContext = EmptyCoroutineContext,
- block: suspend CoroutineScope.() -> T?
-): Maybe<T> = rxMaybeInternal(this, context, block)
-
private fun <T> rxMaybeInternal(
scope: CoroutineScope, // support for legacy rxMaybe in scope
context: CoroutineContext,
@@ -54,7 +40,7 @@ private fun <T> rxMaybeInternal(
private class RxMaybeCoroutine<T>(
parentContext: CoroutineContext,
private val subscriber: MaybeEmitter<T>
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
@@ -65,11 +51,23 @@ private class RxMaybeCoroutine<T>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
+
+/** @suppress */
+@Deprecated(
+ message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe",
+ level = DeprecationLevel.HIDDEN,
+ replaceWith = ReplaceWith("rxMaybe(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+public fun <T> CoroutineScope.rxMaybe(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?
+): Maybe<T> = rxMaybeInternal(this, context, block)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index 6d11cb9c..5f409815 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -2,8 +2,6 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
-
package kotlinx.coroutines.rx2
import io.reactivex.*
@@ -11,10 +9,10 @@ import io.reactivex.exceptions.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
-import kotlin.internal.*
/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -32,7 +30,6 @@ import kotlin.internal.*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> rxObservable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
@@ -42,17 +39,6 @@ public fun <T : Any> rxObservable(
return rxObservableInternal(GlobalScope, context, block)
}
-@Deprecated(
- message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("rxObservable(context, block)")
-) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
-@LowPriorityInOverloadResolution
-public fun <T : Any> CoroutineScope.rxObservable(
- context: CoroutineContext = EmptyCoroutineContext,
- @BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Observable<T> = rxObservableInternal(this, context, block)
-
private fun <T : Any> rxObservableInternal(
scope: CoroutineScope, // support for legacy rxObservable in scope
context: CoroutineContext,
@@ -68,39 +54,35 @@ private const val OPEN = 0 // open channel, still working
private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
-private class RxObservableCoroutine<T: Any>(
+private class RxObservableCoroutine<T : Any>(
parentContext: CoroutineContext,
private val subscriber: ObservableEmitter<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
+) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
- // Mutex is locked when while subscriber.onXXX is being invoked
+ // Mutex is locked while subscriber.onXXX is being invoked
private val mutex = Mutex()
private val _signal = atomic(OPEN)
- override val isClosedForSend: Boolean get() = isCompleted
- override val isFull: Boolean = mutex.isLocked
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
- doLockedNext(element)
- return true
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (offer(element)) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -108,30 +90,39 @@ private class RxObservableCoroutine<T: Any>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
- override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ override fun <R> registerSelectClause2(
+ select: SelectInstance<R>,
+ element: T,
+ block: suspend (SendChannel<T>) -> R
+ ) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
// assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
+ private fun doLockedNext(elem: T): Throwable? {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted(completionCause, completionCauseHandled)
- throw getCancellationException()
+ return getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
- mutex.unlock()
- throw e
+ val cause = UndeliverableException(e)
+ val causeDelivered = close(cause)
+ unlockAndCheckCompleted()
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ handleUndeliverableException(cause, context)
+ getCancellationException()
+ }
}
/*
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
@@ -140,6 +131,7 @@ private class RxObservableCoroutine<T: Any>(
* We have to recheck `isCompleted` after `unlock` anyway.
*/
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -153,33 +145,31 @@ private class RxObservableCoroutine<T: Any>(
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
// cancellation failures
try {
- if (_signal.value >= CLOSED) {
- _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ if (_signal.value == SIGNALLED)
+ return
+ _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = cause?.let { unwrap(it) }
+ if (unwrappedCause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.tryOnError(cause)
- if (!handled && cause.isFatal()) {
- handleUndeliverableException(cause, context)
- }
- }
- else {
- subscriber.onComplete()
- }
- } catch (e: Throwable) {
- // Unhandled exception (cannot handle in other way, since we are already complete)
+ subscriber.onComplete()
+ } catch (e: Exception) {
handleUndeliverableException(e, context)
}
+ } else if (unwrappedCause is UndeliverableException && !handled) {
+ /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
+ * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
+ * cancelled. */
+ handleUndeliverableException(cause, context)
+ } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
+ try {
+ /** If the subscriber is already in a terminal state, the error will be signalled to
+ * `RxJavaPlugins.onError`. */
+ subscriber.onError(cause)
+ } catch (e: Exception) {
+ cause.addSuppressed(e)
+ handleUndeliverableException(cause, context)
+ }
}
} finally {
mutex.unlock()
@@ -201,9 +191,13 @@ private class RxObservableCoroutine<T: Any>(
}
}
-internal fun Throwable.isFatal() = try {
- Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
- false
-} catch (e: Throwable) {
- true
-}
+/** @suppress */
+@Deprecated(
+ message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
+ level = DeprecationLevel.HIDDEN,
+ replaceWith = ReplaceWith("rxObservable(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+public fun <T : Any> CoroutineScope.rxObservable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> = rxObservableInternal(this, context, block)
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
index b8012b6d..27842a21 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt
@@ -2,14 +2,11 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
-
package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
-import kotlin.internal.*
/**
* Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result.
@@ -28,17 +25,6 @@ public fun <T : Any> rxSingle(
return rxSingleInternal(GlobalScope, context, block)
}
-@Deprecated(
- message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("rxSingle(context, block)")
-) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
-@LowPriorityInOverloadResolution
-public fun <T : Any> CoroutineScope.rxSingle(
- context: CoroutineContext = EmptyCoroutineContext,
- block: suspend CoroutineScope.() -> T
-): Single<T> = rxSingleInternal(this, context, block)
-
private fun <T : Any> rxSingleInternal(
scope: CoroutineScope, // support for legacy rxSingle in scope
context: CoroutineContext,
@@ -53,7 +39,7 @@ private fun <T : Any> rxSingleInternal(
private class RxSingleCoroutine<T: Any>(
parentContext: CoroutineContext,
private val subscriber: SingleEmitter<T>
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
subscriber.onSuccess(value)
@@ -64,11 +50,23 @@ private class RxSingleCoroutine<T: Any>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
+
+/** @suppress */
+@Deprecated(
+ message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle",
+ level = DeprecationLevel.HIDDEN,
+ replaceWith = ReplaceWith("rxSingle(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+public fun <T : Any> CoroutineScope.rxSingle(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T
+): Single<T> = rxSingleInternal(this, context, block)
diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
index 298b32bf..16f0005b 100644
--- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt
@@ -98,6 +98,31 @@ class CompletableTest : TestBase() {
}
}
+ /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is
+ * cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val completable = CompletableSource { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ completable.await()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
@Test
fun testSuppressedException() = runTest {
val completable = rxCompletable(currentDispatcher()) {
@@ -119,7 +144,7 @@ class CompletableTest : TestBase() {
}
@Test
- fun testUnhandledException() = runTest() {
+ fun testUnhandledException() = runTest {
expect(1)
var disposable: Disposable? = null
val handler = { e: Throwable ->
@@ -165,8 +190,7 @@ class CompletableTest : TestBase() {
withExceptionHandler(handler) {
rxCompletable(Dispatchers.Unconfined) {
expect(1)
- 42
- }.subscribe({ throw LinkageError() })
+ }.subscribe { throw LinkageError() }
finish(3)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
index 05b7ee92..31643929 100644
--- a/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/FlowableExceptionHandlingTest.kt
@@ -38,16 +38,16 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
}.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is reported to both onError and CEH
+ expect(2) // Fatal exceptions are not treated as special
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +66,7 @@ class FlowableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxFlowable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -77,19 +77,19 @@ class FlowableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxFlowable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Fatal exception is reported to both onError and CEH
- finish(5)
+ }, { expectUnreached() }) // Fatal exception is rethrown from `onNext` => the subscription is thought to be cancelled
+ finish(4)
}
@Test
diff --git a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
index 540fa76b..8a6362ad 100644
--- a/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/IntegrationTest.kt
@@ -7,6 +7,7 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
@@ -92,7 +93,7 @@ class IntegrationTest(
assertEquals(n, observable.awaitLast())
assertFailsWith<IllegalArgumentException> { observable.awaitSingle() }
checkNumbers(n, observable)
- val channel = observable.openSubscription()
+ val channel = observable.toChannel()
checkNumbers(n, channel.consumeAsFlow().asObservable(ctx(coroutineContext)))
channel.cancel()
}
@@ -124,6 +125,21 @@ class IntegrationTest(
finish(3)
}
+ @Test
+ fun testObservableWithTimeout() = runTest {
+ val observable = rxObservable<Int> {
+ expect(2)
+ withTimeout(1) { delay(100) }
+ }
+ try {
+ expect(1)
+ observable.awaitFirstOrNull()
+ } catch (e: CancellationException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
private suspend fun checkNumbers(n: Int, observable: Observable<Int>) {
var last = 0
observable.collect {
diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
index 08427dcf..f5d128d3 100644
--- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt
@@ -7,13 +7,12 @@ package kotlinx.coroutines.rx2
import io.reactivex.*
import io.reactivex.disposables.*
import io.reactivex.exceptions.*
-import io.reactivex.functions.*
import io.reactivex.internal.functions.Functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
-import java.util.concurrent.CancellationException
import kotlin.test.*
class MaybeTest : TestBase() {
@@ -47,7 +46,7 @@ class MaybeTest : TestBase() {
null
}
expect(2)
- maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action {
+ maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
expect(5)
})
expect(3)
@@ -112,18 +111,45 @@ class MaybeTest : TestBase() {
@Test
fun testMaybeAwait() = runBlocking {
- assertEquals("OK", Maybe.just("O").await() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
+ assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
}
@Test
- fun testMaybeAwaitForNull() = runBlocking {
- assertNull(Maybe.empty<String>().await())
+ fun testMaybeAwaitForNull(): Unit = runBlocking {
+ assertNull(Maybe.empty<String>().awaitSingleOrNull())
+ assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
+ }
+
+ /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testMaybeAwaitCancellation() = runTest {
+ expect(1)
+ val maybe = MaybeSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ maybe.awaitSingleOrNull()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
}
@Test
fun testMaybeEmitAndAwait() {
val maybe = rxMaybe {
- Maybe.just("O").await() + "K"
+ Maybe.just("O").awaitSingleOrNull() + "K"
}
checkMaybeValue(maybe) {
@@ -205,7 +231,7 @@ class MaybeTest : TestBase() {
@Test
fun testCancelledConsumer() = runTest {
expect(1)
- val maybe = rxMaybe<Int>(currentDispatcher()) {
+ val maybe = rxMaybe(currentDispatcher()) {
expect(4)
try {
delay(Long.MAX_VALUE)
@@ -228,6 +254,56 @@ class MaybeTest : TestBase() {
finish(7)
}
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectEmpty() = runTest {
+ expect(1)
+ Maybe.empty<Int>().collect {
+ expectUnreached()
+ }
+ finish(2)
+ }
+
+ /** Tests the simple scenario where the Maybe doesn't output a value. */
+ @Test
+ fun testMaybeCollectSingle() = runTest {
+ expect(1)
+ Maybe.just("OK").collect {
+ assertEquals("OK", it)
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the Maybe raises an error. */
+ @Test
+ fun testMaybeCollectThrowingMaybe() = runTest {
+ expect(1)
+ try {
+ Maybe.error<Int>(TestException()).collect {
+ expectUnreached()
+ }
+ } catch (e: TestException) {
+ expect(2)
+ }
+ finish(3)
+ }
+
+ /** Tests the behavior of [collect] when the action throws. */
+ @Test
+ fun testMaybeCollectThrowingAction() = runTest {
+ expect(1)
+ try {
+ Maybe.just("OK").collect {
+ expect(2)
+ throw TestException()
+ }
+ } catch (e: TestException) {
+ expect(3)
+ }
+ finish(4)
+ }
+
@Test
fun testSuppressedException() = runTest {
val maybe = rxMaybe(currentDispatcher()) {
@@ -241,7 +317,7 @@ class MaybeTest : TestBase() {
}
}
try {
- maybe.await()
+ maybe.awaitSingleOrNull()
expectUnreached()
} catch (e: TestException) {
assertTrue(e.suppressed[0] is TestException2)
@@ -301,7 +377,7 @@ class MaybeTest : TestBase() {
rxMaybe(Dispatchers.Unconfined) {
expect(1)
42
- }.subscribe({ throw LinkageError() })
+ }.subscribe { throw LinkageError() }
finish(3)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt
new file mode 100644
index 00000000..508f594a
--- /dev/null
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.rx2
+
+import io.reactivex.*
+import io.reactivex.disposables.*
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.test.*
+
+class ObservableCollectTest: TestBase() {
+
+ /** Tests the behavior of [collect] when the publisher raises an error. */
+ @Test
+ fun testObservableCollectThrowingObservable() = runTest {
+ expect(1)
+ var sum = 0
+ try {
+ rxObservable {
+ for (i in 0..100) {
+ send(i)
+ }
+ throw TestException()
+ }.collect {
+ sum += it
+ }
+ } catch (e: TestException) {
+ assertTrue(sum > 0)
+ finish(2)
+ }
+ }
+
+ /** Tests the behavior of [collect] when the action throws. */
+ @Test
+ fun testObservableCollectThrowingAction() = runTest {
+ expect(1)
+ var sum = 0
+ val expectedSum = 5
+ try {
+ var disposed = false
+ ObservableSource<Int> { observer ->
+ launch(Dispatchers.Default) {
+ observer.onSubscribe(object : Disposable {
+ override fun dispose() {
+ disposed = true
+ expect(expectedSum + 2)
+ }
+
+ override fun isDisposed(): Boolean = disposed
+ })
+ while (!disposed) {
+ observer.onNext(1)
+ }
+ }
+ }.collect {
+ expect(sum + 2)
+ sum += it
+ if (sum == expectedSum) {
+ throw TestException()
+ }
+ }
+ } catch (e: TestException) {
+ assertEquals(expectedSum, sum)
+ finish(expectedSum + 3)
+ }
+ }
+} \ No newline at end of file
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
index d6cdd3ca..fb3d0f69 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt
@@ -8,6 +8,7 @@ import io.reactivex.exceptions.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
+import java.util.concurrent.*
import kotlin.test.*
class ObservableExceptionHandlingTest : TestBase() {
@@ -18,7 +19,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
private inline fun <reified T : Throwable> handler(expect: Int) = { t: Throwable ->
- assertTrue(t is UndeliverableException && t.cause is T)
+ assertTrue(t is UndeliverableException && t.cause is T, "$t")
expect(expect)
}
@@ -38,8 +39,8 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalException() = withExceptionHandler(handler<LinkageError>(3)) {
- rxObservable<Int>(Dispatchers.Unconfined) {
+ fun testFatalException() = withExceptionHandler({ expectUnreached() }) {
+ rxObservable<Int>(Dispatchers.Unconfined + cehUnreached()) {
expect(1)
throw LinkageError()
}.subscribe({
@@ -47,7 +48,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}, {
expect(2)
})
- finish(4)
+ finish(3)
}
@Test
@@ -66,7 +67,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testFatalExceptionAsynchronous() = withExceptionHandler(handler<LinkageError>(3)) {
+ fun testFatalExceptionAsynchronous() = withExceptionHandler({ expectUnreached() }) {
rxObservable<Int>(Dispatchers.Unconfined) {
expect(1)
throw LinkageError()
@@ -75,20 +76,28 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expectUnreached()
}, {
- expect(2) // Fatal exception is not reported in onError
+ expect(2) // Fatal exceptions are not treated in a special manner
})
- finish(4)
+ finish(3)
}
@Test
- fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
+ val latch = CountDownLatch(1)
rxObservable(Dispatchers.Unconfined) {
expect(1)
- send(Unit)
+ val result = trySend(Unit)
+ val exception = result.exceptionOrNull()
+ assertTrue(exception is UndeliverableException)
+ assertTrue(exception.cause is LinkageError)
+ assertTrue(isClosedForSend)
+ expect(4)
+ latch.countDown()
}.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) }) // Unreached because fatal errors are rethrown
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ latch.await()
finish(5)
}
@@ -100,7 +109,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}.subscribe({
expect(2)
throw TestException()
- }, { expect(3) }) // not reported to onError because came from the subscribe itself
+ }, { expect(3) })
finish(4)
}
@@ -119,7 +128,7 @@ class ObservableExceptionHandlingTest : TestBase() {
}
@Test
- fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(4)) {
+ fun testAsynchronousFatalExceptionFromSubscribe() = withExceptionHandler(handler<LinkageError>(3)) {
rxObservable(Dispatchers.Unconfined) {
expect(1)
send(Unit)
@@ -128,7 +137,7 @@ class ObservableExceptionHandlingTest : TestBase() {
.subscribe({
expect(2)
throw LinkageError()
- }, { expect(3) })
- finish(5)
+ }, { expectUnreached() }) // Unreached because RxJava bubbles up fatal exceptions, causing `onNext` to throw.
+ finish(4)
}
}
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt
index 4454190f..e246407a 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt
@@ -5,7 +5,9 @@
package kotlinx.coroutines.rx2
import io.reactivex.*
+import io.reactivex.disposables.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() {
@Test
fun testAwaitFirstOrNull() {
- val observable = rxObservable<String> {
+ val observable = rxObservable {
send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
}
@@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() {
}
}
+ /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of
+ * the subscription when their [Job] is cancelled. */
+ @Test
+ fun testAwaitCancellation() = runTest {
+ expect(1)
+ val observable = ObservableSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ observable.awaitFirst()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
+
@Test
fun testExceptionFromObservable() {
val observable = rxObservable {
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
index 159f3729..0253fced 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSourceAsFlowStressTest.kt
@@ -26,7 +26,7 @@ class ObservableSourceAsFlowStressTest : TestBase() {
val latch = Channel<Unit>(1)
var i = 0
val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
- .doOnNext { if (++i > 100) latch.offer(Unit) }
+ .doOnNext { if (++i > 100) latch.trySend(Unit) }
val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
latch.receive()
job.cancelAndJoin()
diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt
index 3cd3bbff..2c22cbf0 100644
--- a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt
@@ -1,12 +1,14 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx2
import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import org.junit.Test
+import kotlin.onSuccess
import kotlin.test.*
class ObservableSubscriptionSelectTest : TestBase() {
@@ -18,27 +20,27 @@ class ObservableSubscriptionSelectTest : TestBase() {
var a = 0
var b = 0
// open two subs
- val channelA = source.openSubscription()
- val channelB = source.openSubscription()
+ val channelA = source.toChannel()
+ val channelB = source.toChannel()
loop@ while (true) {
val done: Int = select {
- channelA.onReceiveOrNull {
- if (it != null) assertEquals(a++, it)
- if (it == null) 0 else 1
+ channelA.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(a++, it) }
+ if (result.isSuccess) 1 else 0
}
- channelB.onReceiveOrNull {
- if (it != null) assertEquals(b++, it)
- if (it == null) 0 else 2
+ channelB.onReceiveCatching { result ->
+ result.onSuccess { assertEquals(b++, it) }
+ if (result.isSuccess) 2 else 0
}
}
when (done) {
0 -> break@loop
1 -> {
- val r = channelB.receiveOrNull()
+ val r = channelB.receiveCatching().getOrNull()
if (r != null) assertEquals(b++, r)
}
2 -> {
- val r = channelA.receiveOrNull()
+ val r = channelA.receiveCatching().getOrNull()
if (r != null) assertEquals(a++, r)
}
}
@@ -48,4 +50,4 @@ class ObservableSubscriptionSelectTest : TestBase() {
// should receive one of them fully
assertTrue(a == n || b == n)
}
-} \ No newline at end of file
+}
diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
index c66188a1..b359d963 100644
--- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
+++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt
@@ -9,6 +9,7 @@ import io.reactivex.disposables.*
import io.reactivex.exceptions.*
import io.reactivex.functions.*
import kotlinx.coroutines.*
+import kotlinx.coroutines.CancellationException
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
@@ -98,6 +99,31 @@ class SingleTest : TestBase() {
assertEquals("OK", Single.just("O").await() + "K")
}
+ /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their
+ * [Job] is cancelled. */
+ @Test
+ fun testSingleAwaitCancellation() = runTest {
+ expect(1)
+ val single = SingleSource<Int> { s ->
+ s.onSubscribe(object: Disposable {
+ override fun dispose() { expect(4) }
+ override fun isDisposed(): Boolean { expectUnreached(); return false }
+ })
+ }
+ val job = launch(start = CoroutineStart.UNDISPATCHED) {
+ try {
+ expect(2)
+ single.await()
+ } catch (e: CancellationException) {
+ expect(5)
+ throw e
+ }
+ }
+ expect(3)
+ job.cancelAndJoin()
+ finish(6)
+ }
+
@Test
fun testSingleEmitAndAwait() {
val single = rxSingle {
@@ -221,7 +247,7 @@ class SingleTest : TestBase() {
fun testFatalExceptionInSingle() = runTest {
rxSingle(Dispatchers.Unconfined) {
throw LinkageError()
- }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) })
+ }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) }
finish(2)
}