diff options
author | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2021-08-23 23:55:41 +0000 |
---|---|---|
committer | Android Build Coastguard Worker <android-build-coastguard-worker@google.com> | 2021-08-23 23:55:41 +0000 |
commit | ffe1b7dc19f3fbe1bc4a5f4f1e28c64cd68c5371 (patch) | |
tree | 747e5be83c6e6a2e2d9a164ebd03c5e45e6ec952 /reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | |
parent | 2bbecd60d31b5d0b48847f67e741018f5ae7aa07 (diff) | |
parent | 531d55eaeac09ad6cde55687ef802d4235040985 (diff) | |
download | platform_external_kotlinx.coroutines-simpleperf-release.tar.gz platform_external_kotlinx.coroutines-simpleperf-release.tar.bz2 platform_external_kotlinx.coroutines-simpleperf-release.zip |
Snap for 7668063 from 531d55eaeac09ad6cde55687ef802d4235040985 to simpleperf-releasesimpleperf-release
Change-Id: I844c9c61f2e31a265c36a483e1509e0e08c508d6
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxConvert.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 14c24942..497c922c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -26,7 +26,6 @@ import kotlin.coroutines.* * * @param context -- the coroutine context from which the resulting completable is going to be signalled */ -@ExperimentalCoroutinesApi public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) { this@asCompletable.join() } @@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) { this@asMaybe.await() } @@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay * * @param context -- the coroutine context from which the resulting single is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) { this@asSingle.await() } @@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ -@ExperimentalCoroutinesApi public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { val disposableRef = AtomicReference<Disposable>() val observer = object : Observer<T> { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { + /* + * Channel was closed by the downstream, so the exception (if any) + * also was handled by the same downstream + */ try { - sendBlocking(t) - } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 - // Is handled by the downstream flow + trySendBlocking(t) + } catch (e: InterruptedException) { + // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } @@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if @@ -137,7 +136,6 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = Flowable.fromPublisher(asPublisher(context)) @@ -151,6 +149,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): send(t) } +/** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") @@ -158,6 +157,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = asFlowable(context) +/** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") |