aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxConvert.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxConvert.kt18
1 files changed, 9 insertions, 9 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
index 14c24942..497c922c 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt
@@ -26,7 +26,6 @@ import kotlin.coroutines.*
*
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
this@asCompletable.join()
}
@@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
this@asMaybe.await()
}
@@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay
*
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
this@asSingle.await()
}
@@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T>
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) {
+ /*
+ * Channel was closed by the downstream, so the exception (if any)
+ * also was handled by the same downstream
+ */
try {
- sendBlocking(t)
- } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
- // Is handled by the downstream flow
+ trySendBlocking(t)
+ } catch (e: InterruptedException) {
+ // RxJava interrupts the source
}
}
override fun onError(e: Throwable) { close(e) }
@@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
@@ -137,7 +136,6 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
@@ -151,6 +149,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
send(t)
}
+/** @suppress **/
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@@ -158,6 +157,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)
+/** @suppress **/
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")