diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2021-08-23 23:55:41 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2021-08-23 23:55:41 +0000 |
commit | ffe1b7dc19f3fbe1bc4a5f4f1e28c64cd68c5371 (patch) | |
tree | 747e5be83c6e6a2e2d9a164ebd03c5e45e6ec952 /reactive/kotlinx-coroutines-rx3/src/RxChannel.kt | |
parent | 2bbecd60d31b5d0b48847f67e741018f5ae7aa07 (diff) | |
parent | 531d55eaeac09ad6cde55687ef802d4235040985 (diff) | |
download | platform_external_kotlinx.coroutines-ffe1b7dc19f3fbe1bc4a5f4f1e28c64cd68c5371.tar.gz platform_external_kotlinx.coroutines-ffe1b7dc19f3fbe1bc4a5f4f1e28c64cd68c5371.tar.bz2 platform_external_kotlinx.coroutines-ffe1b7dc19f3fbe1bc4a5f4f1e28c64cd68c5371.zip |
Snap for 7668063 from 531d55eaeac09ad6cde55687ef802d4235040985 to simpleperf-releasesimpleperf-release
Change-Id: I844c9c61f2e31a265c36a483e1509e0e08c508d6
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/src/RxChannel.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxChannel.kt | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 76333f2e..21238d24 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.flow.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. @@ -40,14 +41,18 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> { /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from + * [collect]. */ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) @@ -69,11 +74,12 @@ private class SubscriptionChannel<T> : } override fun onSuccess(t: T) { - offer(t) + trySend(t) + close(cause = null) } override fun onNext(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { |