diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxObservable.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-rx2/src/RxObservable.kt | 140 |
1 files changed, 67 insertions, 73 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 6d11cb9c..5f409815 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -2,8 +2,6 @@ * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.rx2 import io.reactivex.* @@ -11,10 +9,10 @@ import io.reactivex.exceptions.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* -import kotlin.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -32,7 +30,6 @@ import kotlin.internal.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T : Any> rxObservable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -42,17 +39,6 @@ public fun <T : Any> rxObservable( return rxObservableInternal(GlobalScope, context, block) } -@Deprecated( - message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("rxObservable(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 -@LowPriorityInOverloadResolution -public fun <T : Any> CoroutineScope.rxObservable( - context: CoroutineContext = EmptyCoroutineContext, - @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Observable<T> = rxObservableInternal(this, context, block) - private fun <T : Any> rxObservableInternal( scope: CoroutineScope, // support for legacy rxObservable in scope context: CoroutineContext, @@ -68,39 +54,35 @@ private const val OPEN = 0 // open channel, still working private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError -private class RxObservableCoroutine<T: Any>( +private class RxObservableCoroutine<T : Any>( parentContext: CoroutineContext, private val subscriber: ObservableEmitter<T> -) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { +) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { override val channel: SendChannel<T> get() = this - // Mutex is locked when while subscriber.onXXX is being invoked + // Mutex is locked while subscriber.onXXX is being invoked private val mutex = Mutex() private val _signal = atomic(OPEN) - override val isClosedForSend: Boolean get() = isCompleted - override val isFull: Boolean = mutex.isLocked + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false - doLockedNext(element) - return true - } + override fun trySend(element: T): ChannelResult<Unit> = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (offer(element)) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2<T, SendChannel<T>> @@ -108,30 +90,39 @@ private class RxObservableCoroutine<T: Any>( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) { + override fun <R> registerSelectClause2( + select: SelectInstance<R>, + element: T, + block: suspend (SendChannel<T>) -> R + ) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { + private fun doLockedNext(elem: T): Throwable? { // check if already closed for send if (!isActive) { doLockedSignalCompleted(completionCause, completionCauseHandled) - throw getCancellationException() + return getCancellationException() } // notify subscriber try { subscriber.onNext(elem) } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) - mutex.unlock() - throw e + val cause = UndeliverableException(e) + val causeDelivered = close(cause) + unlockAndCheckCompleted() + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + handleUndeliverableException(cause, context) + getCancellationException() + } } /* * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might @@ -140,6 +131,7 @@ private class RxObservableCoroutine<T: Any>( * We have to recheck `isCompleted` after `unlock` anyway. */ unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -153,33 +145,31 @@ private class RxObservableCoroutine<T: Any>( private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { // cancellation failures try { - if (_signal.value >= CLOSED) { - _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (_signal.value == SIGNALLED) + return + _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = cause?.let { unwrap(it) } + if (unwrappedCause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { - handleUndeliverableException(cause, context) - } - } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - // Unhandled exception (cannot handle in other way, since we are already complete) + subscriber.onComplete() + } catch (e: Exception) { handleUndeliverableException(e, context) } + } else if (unwrappedCause is UndeliverableException && !handled) { + /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, + * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already + * cancelled. */ + handleUndeliverableException(cause, context) + } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { + try { + /** If the subscriber is already in a terminal state, the error will be signalled to + * `RxJavaPlugins.onError`. */ + subscriber.onError(cause) + } catch (e: Exception) { + cause.addSuppressed(e) + handleUndeliverableException(cause, context) + } } } finally { mutex.unlock() @@ -201,9 +191,13 @@ private class RxObservableCoroutine<T: Any>( } } -internal fun Throwable.isFatal() = try { - Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode - false -} catch (e: Throwable) { - true -} +/** @suppress */ +@Deprecated( + message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("rxObservable(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0 +public fun <T : Any> CoroutineScope.rxObservable( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope<T>.() -> Unit +): Observable<T> = rxObservableInternal(this, context, block) |