diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src')
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxAwait.kt | 142 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt | 1 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxChannel.kt | 49 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt | 36 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxConvert.kt | 18 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt | 4 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt | 34 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxObservable.kt | 140 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxSingle.kt | 34 |
9 files changed, 256 insertions, 202 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index 6e162c9a..0e0b47eb 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -15,11 +15,12 @@ import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ /** - * Awaits for completion of this completable without blocking a thread. - * Returns `Unit` or throws the corresponding exception if this completable had produced error. + * Awaits for completion of this completable without blocking the thread. + * Returns `Unit`, or throws the corresponding exception if this completable produces an error. * * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this - * suspending function is suspended, this function immediately resumes with [CancellationException]. + * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its + * subscription. */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { @@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine // ------------------------ MaybeSource ------------------------ /** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this + * [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + */ +@Suppress("UNCHECKED_CAST") +public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : MaybeObserver<T> { + override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } + override fun onComplete() { cont.resume(null) } + override fun onSuccess(t: T) { cont.resume(t) } + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + * + * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. + */ +public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + +/** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this * maybe had produced error. @@ -39,9 +71,19 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. + * @suppress */ -@Suppress("UNCHECKED_CAST") -public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null) +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. @@ -51,25 +93,29 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). + * @suppress */ -public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver<T> { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(default) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) -} +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ /** - * Awaits for completion of the single value without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this single had produced error. + * Awaits for completion of the single value response without blocking the thread. + * Returns the resulting value, or throws the corresponding exception if this response produces an error. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { @@ -82,69 +128,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine // ------------------------ ObservableSource ------------------------ /** - * Awaits for the first value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or, + * if the observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the + * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, + * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws + * the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() +public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = + awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the last value from the given [Observable] without blocking the thread and + * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, + * if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value - * @throws IllegalArgumentException if observable emits more than one value + * @throws NoSuchElementException if the observable does not emit any value + * @throws IllegalArgumentException if the observable emits more than one value */ public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt index 0fe43f1c..3e39033e 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCancellable.kt @@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo try { RxJavaPlugins.onError(cause) } catch (e: Throwable) { + cause.addSuppressed(e) handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index a129196a..bb093b07 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -9,6 +9,8 @@ import io.reactivex.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. @@ -16,8 +18,9 @@ import kotlinx.coroutines.internal.* * * This API is deprecated in the favour of [Flow]. * [MaybeSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. + * @suppress */ -@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4 +@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> { val channel = SubscriptionChannel<T>() subscribe(channel) @@ -30,37 +33,46 @@ public fun <T> MaybeSource<T>.openSubscription(): ReceiveChannel<T> { * * This API is deprecated in the favour of [Flow]. * [ObservableSource] doesn't have a corresponding [Flow] adapter, so it should be transformed to [Observable] first. + * @suppress */ -@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.WARNING) // Will be hidden in 1.4 +@Deprecated(message = "Deprecated in the favour of Flow", level = DeprecationLevel.ERROR) // Will be hidden in 1.5 public fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> { val channel = SubscriptionChannel<T>() subscribe(channel) return channel } -// Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) -public suspend inline fun <T> MaybeSource<T>.consumeEach(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) - -// Will be promoted to error in 1.3.0, removed in 1.4.0 -@Deprecated(message = "Use collect instead", level = DeprecationLevel.ERROR, replaceWith = ReplaceWith("this.collect(action)")) -public suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) - /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from + * [collect]. */ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) + toChannel().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit = - openSubscription().consumeEach(action) + toChannel().consumeEach(action) + +@PublishedApi +internal fun <T> MaybeSource<T>.toChannel(): ReceiveChannel<T> { + val channel = SubscriptionChannel<T>() + subscribe(channel) + return channel +} + +@PublishedApi +internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> { + val channel = SubscriptionChannel<T>() + subscribe(channel) + return channel +} @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") private class SubscriptionChannel<T> : @@ -79,11 +91,12 @@ private class SubscriptionChannel<T> : } override fun onSuccess(t: T) { - offer(t) + trySend(t) + close(cause = null) } override fun onNext(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index d0a43fb1..3f915382 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -2,14 +2,11 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [Completable] that runs a given [block] in a coroutine and emits its result. @@ -28,17 +25,6 @@ public fun rxCompletable( return rxCompletableInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxCompletable(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun CoroutineScope.rxCompletable( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> Unit -): Completable = rxCompletableInternal(this, context, block) - private fun rxCompletableInternal( scope: CoroutineScope, // support for legacy rxCompletable in scope context: CoroutineContext, @@ -53,7 +39,7 @@ private fun rxCompletableInternal( private class RxCompletableCoroutine( parentContext: CoroutineContext, private val subscriber: CompletableEmitter -) : AbstractCoroutine<Unit>(parentContext, true) { +) : AbstractCoroutine<Unit>(parentContext, false, true) { override fun onCompleted(value: Unit) { try { subscriber.onComplete() @@ -64,11 +50,25 @@ private class RxCompletableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } + +/** + * @suppress + */ +@Deprecated( + message = "CoroutineScope.rxCompletable is deprecated in favour of top-level rxCompletable", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxCompletable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun CoroutineScope.rxCompletable( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> Unit +): Completable = rxCompletableInternal(this, context, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt index 14c24942..497c922c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt @@ -26,7 +26,6 @@ import kotlin.coroutines.* * * @param context -- the coroutine context from which the resulting completable is going to be signalled */ -@ExperimentalCoroutinesApi public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) { this@asCompletable.join() } @@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) { this@asMaybe.await() } @@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay * * @param context -- the coroutine context from which the resulting single is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) { this@asSingle.await() } @@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ -@ExperimentalCoroutinesApi public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { val disposableRef = AtomicReference<Disposable>() val observer = object : Observer<T> { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { + /* + * Channel was closed by the downstream, so the exception (if any) + * also was handled by the same downstream + */ try { - sendBlocking(t) - } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 - // Is handled by the downstream flow + trySendBlocking(t) + } catch (e: InterruptedException) { + // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } @@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if @@ -137,7 +136,6 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = Flowable.fromPublisher(asPublisher(context)) @@ -151,6 +149,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): send(t) } +/** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") @@ -158,6 +157,7 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext): public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = asFlowable(context) +/** @suppress **/ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") diff --git a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt index 8dfe9576..c856bb4e 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxFlowable.kt @@ -31,7 +31,6 @@ import kotlin.internal.* * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect */ -@ExperimentalCoroutinesApi public fun <T: Any> rxFlowable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -41,9 +40,10 @@ public fun <T: Any> rxFlowable( return Flowable.fromPublisher(publishInternal(GlobalScope, context, RX_HANDLER, block)) } +/** @suppress */ @Deprecated( message = "CoroutineScope.rxFlowable is deprecated in favour of top-level rxFlowable", - level = DeprecationLevel.ERROR, + level = DeprecationLevel.HIDDEN, replaceWith = ReplaceWith("rxFlowable(context, block)") ) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 @LowPriorityInOverloadResolution diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index f5ed48b9..ab713123 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -2,14 +2,11 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [maybe][Maybe] that will run a given [block] in a coroutine and emits its result. @@ -29,17 +26,6 @@ public fun <T> rxMaybe( return rxMaybeInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxMaybe(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T> CoroutineScope.rxMaybe( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T? -): Maybe<T> = rxMaybeInternal(this, context, block) - private fun <T> rxMaybeInternal( scope: CoroutineScope, // support for legacy rxMaybe in scope context: CoroutineContext, @@ -54,7 +40,7 @@ private fun <T> rxMaybeInternal( private class RxMaybeCoroutine<T>( parentContext: CoroutineContext, private val subscriber: MaybeEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) @@ -65,11 +51,23 @@ private class RxMaybeCoroutine<T>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } + +/** @suppress */ +@Deprecated( + message = "CoroutineScope.rxMaybe is deprecated in favour of top-level rxMaybe", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxMaybe(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T> CoroutineScope.rxMaybe( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T? +): Maybe<T> = rxMaybeInternal(this, context, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 6d11cb9c..5f409815 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -2,8 +2,6 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* @@ -11,10 +9,10 @@ import io.reactivex.exceptions.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -32,7 +30,6 @@ import kotlin.internal.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T : Any> rxObservable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -42,17 +39,6 @@ public fun <T : Any> rxObservable( return rxObservableInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxObservable(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T : Any> CoroutineScope.rxObservable( - context: CoroutineContext = EmptyCoroutineContext, - @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Observable<T> = rxObservableInternal(this, context, block) - private fun <T : Any> rxObservableInternal( scope: CoroutineScope, // support for legacy rxObservable in scope context: CoroutineContext, @@ -68,39 +54,35 @@ private const val OPEN = 0 // open channel, still working private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError -private class RxObservableCoroutine<T: Any>( +private class RxObservableCoroutine<T : Any>( parentContext: CoroutineContext, private val subscriber: ObservableEmitter<T> -) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { +) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { override val channel: SendChannel<T> get() = this - // Mutex is locked when while subscriber.onXXX is being invoked + // Mutex is locked while subscriber.onXXX is being invoked private val mutex = Mutex() private val _signal = atomic(OPEN) - override val isClosedForSend: Boolean get() = isCompleted - override val isFull: Boolean = mutex.isLocked + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false - doLockedNext(element) - return true - } + override fun trySend(element: T): ChannelResult<Unit> = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (offer(element)) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2<T, SendChannel<T>> @@ -108,30 +90,39 @@ private class RxObservableCoroutine<T: Any>( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) { + override fun <R> registerSelectClause2( + select: SelectInstance<R>, + element: T, + block: suspend (SendChannel<T>) -> R + ) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { + private fun doLockedNext(elem: T): Throwable? { // check if already closed for send if (!isActive) { doLockedSignalCompleted(completionCause, completionCauseHandled) - throw getCancellationException() + return getCancellationException() } // notify subscriber try { subscriber.onNext(elem) } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) - mutex.unlock() - throw e + val cause = UndeliverableException(e) + val causeDelivered = close(cause) + unlockAndCheckCompleted() + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + handleUndeliverableException(cause, context) + getCancellationException() + } } /* * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might @@ -140,6 +131,7 @@ private class RxObservableCoroutine<T: Any>( * We have to recheck `isCompleted` after `unlock` anyway. */ unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -153,33 +145,31 @@ private class RxObservableCoroutine<T: Any>( private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { // cancellation failures try { - if (_signal.value >= CLOSED) { - _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (_signal.value == SIGNALLED) + return + _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = cause?.let { unwrap(it) } + if (unwrappedCause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { - handleUndeliverableException(cause, context) - } - } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - // Unhandled exception (cannot handle in other way, since we are already complete) + subscriber.onComplete() + } catch (e: Exception) { handleUndeliverableException(e, context) } + } else if (unwrappedCause is UndeliverableException && !handled) { + /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, + * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already + * cancelled. */ + handleUndeliverableException(cause, context) + } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { + try { + /** If the subscriber is already in a terminal state, the error will be signalled to + * `RxJavaPlugins.onError`. */ + subscriber.onError(cause) + } catch (e: Exception) { + cause.addSuppressed(e) + handleUndeliverableException(cause, context) + } } } finally { mutex.unlock() @@ -201,9 +191,13 @@ private class RxObservableCoroutine<T: Any>( } } -internal fun Throwable.isFatal() = try { - Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode - false -} catch (e: Throwable) { - true -} +/** @suppress */ +@Deprecated( + message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxObservable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T : Any> CoroutineScope.rxObservable( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope<T>.() -> Unit +): Observable<T> = rxObservableInternal(this, context, block) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index b8012b6d..27842a21 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -2,14 +2,11 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* import kotlinx.coroutines.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [single][Single] that will run a given [block] in a coroutine and emits its result. @@ -28,17 +25,6 @@ public fun <T : Any> rxSingle( return rxSingleInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxSingle(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T : Any> CoroutineScope.rxSingle( - context: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.() -> T -): Single<T> = rxSingleInternal(this, context, block) - private fun <T : Any> rxSingleInternal( scope: CoroutineScope, // support for legacy rxSingle in scope context: CoroutineContext, @@ -53,7 +39,7 @@ private fun <T : Any> rxSingleInternal( private class RxSingleCoroutine<T: Any>( parentContext: CoroutineContext, private val subscriber: SingleEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { subscriber.onSuccess(value) @@ -64,11 +50,23 @@ private class RxSingleCoroutine<T: Any>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } + +/** @suppress */ +@Deprecated( + message = "CoroutineScope.rxSingle is deprecated in favour of top-level rxSingle", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxSingle(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T : Any> CoroutineScope.rxSingle( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T +): Single<T> = rxSingleInternal(this, context, block) |