aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx2/src/RxObservable.kt')
-rw-r--r--reactive/kotlinx-coroutines-rx2/src/RxObservable.kt140
1 files changed, 67 insertions, 73 deletions
diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
index 6d11cb9c..5f409815 100644
--- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
@@ -2,8 +2,6 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
-
package kotlinx.coroutines.rx2
import io.reactivex.*
@@ -11,10 +9,10 @@ import io.reactivex.exceptions.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
-import kotlin.internal.*
/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -32,7 +30,6 @@ import kotlin.internal.*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> rxObservable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
@@ -42,17 +39,6 @@ public fun <T : Any> rxObservable(
return rxObservableInternal(GlobalScope, context, block)
}
-@Deprecated(
- message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("rxObservable(context, block)")
-) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
-@LowPriorityInOverloadResolution
-public fun <T : Any> CoroutineScope.rxObservable(
- context: CoroutineContext = EmptyCoroutineContext,
- @BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Observable<T> = rxObservableInternal(this, context, block)
-
private fun <T : Any> rxObservableInternal(
scope: CoroutineScope, // support for legacy rxObservable in scope
context: CoroutineContext,
@@ -68,39 +54,35 @@ private const val OPEN = 0 // open channel, still working
private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
-private class RxObservableCoroutine<T: Any>(
+private class RxObservableCoroutine<T : Any>(
parentContext: CoroutineContext,
private val subscriber: ObservableEmitter<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
+) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
- // Mutex is locked when while subscriber.onXXX is being invoked
+ // Mutex is locked while subscriber.onXXX is being invoked
private val mutex = Mutex()
private val _signal = atomic(OPEN)
- override val isClosedForSend: Boolean get() = isCompleted
- override val isFull: Boolean = mutex.isLocked
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
- doLockedNext(element)
- return true
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (offer(element)) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -108,30 +90,39 @@ private class RxObservableCoroutine<T: Any>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
- override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ override fun <R> registerSelectClause2(
+ select: SelectInstance<R>,
+ element: T,
+ block: suspend (SendChannel<T>) -> R
+ ) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
// assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
+ private fun doLockedNext(elem: T): Throwable? {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted(completionCause, completionCauseHandled)
- throw getCancellationException()
+ return getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
- mutex.unlock()
- throw e
+ val cause = UndeliverableException(e)
+ val causeDelivered = close(cause)
+ unlockAndCheckCompleted()
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ handleUndeliverableException(cause, context)
+ getCancellationException()
+ }
}
/*
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
@@ -140,6 +131,7 @@ private class RxObservableCoroutine<T: Any>(
* We have to recheck `isCompleted` after `unlock` anyway.
*/
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -153,33 +145,31 @@ private class RxObservableCoroutine<T: Any>(
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
// cancellation failures
try {
- if (_signal.value >= CLOSED) {
- _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ if (_signal.value == SIGNALLED)
+ return
+ _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = cause?.let { unwrap(it) }
+ if (unwrappedCause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.tryOnError(cause)
- if (!handled && cause.isFatal()) {
- handleUndeliverableException(cause, context)
- }
- }
- else {
- subscriber.onComplete()
- }
- } catch (e: Throwable) {
- // Unhandled exception (cannot handle in other way, since we are already complete)
+ subscriber.onComplete()
+ } catch (e: Exception) {
handleUndeliverableException(e, context)
}
+ } else if (unwrappedCause is UndeliverableException && !handled) {
+ /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
+ * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
+ * cancelled. */
+ handleUndeliverableException(cause, context)
+ } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
+ try {
+ /** If the subscriber is already in a terminal state, the error will be signalled to
+ * `RxJavaPlugins.onError`. */
+ subscriber.onError(cause)
+ } catch (e: Exception) {
+ cause.addSuppressed(e)
+ handleUndeliverableException(cause, context)
+ }
}
} finally {
mutex.unlock()
@@ -201,9 +191,13 @@ private class RxObservableCoroutine<T: Any>(
}
}
-internal fun Throwable.isFatal() = try {
- Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
- false
-} catch (e: Throwable) {
- true
-}
+/** @suppress */
+@Deprecated(
+ message = "CoroutineScope.rxObservable is deprecated in favour of top-level rxObservable",
+ level = DeprecationLevel.HIDDEN,
+ replaceWith = ReplaceWith("rxObservable(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0
+public fun <T : Any> CoroutineScope.rxObservable(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Observable<T> = rxObservableInternal(this, context, block)