aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-rx3/src
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/src')
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxAwait.kt145
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt1
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxChannel.kt14
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt9
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxConvert.kt18
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt1
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt9
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxObservable.kt116
-rw-r--r--reactive/kotlinx-coroutines-rx3/src/RxSingle.kt9
9 files changed, 191 insertions, 131 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
index 8ac0a10c..2a14cf7c 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt
@@ -15,11 +15,12 @@ import kotlin.coroutines.*
// ------------------------ CompletableSource ------------------------
/**
- * Awaits for completion of this completable without blocking a thread.
- * Returns `Unit` or throws the corresponding exception if this completable had produced error.
+ * Awaits for completion of this completable without blocking the thread.
+ * Returns `Unit`, or throws the corresponding exception if this completable produces an error.
*
* This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this
- * suspending function is suspended, this function immediately resumes with [CancellationException].
+ * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its
+ * subscription.
*/
public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont ->
subscribe(object : CompletableObserver {
@@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
// ------------------------ MaybeSource ------------------------
/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this
+ * [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ */
+@Suppress("UNCHECKED_CAST")
+public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont ->
+ subscribe(object : MaybeObserver<T> {
+ override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
+ override fun onComplete() { cont.resume(null) }
+ override fun onSuccess(t: T) { cont.resume(t) }
+ override fun onError(error: Throwable) { cont.resumeWithException(error) }
+ })
+}
+
+/**
+ * Awaits for completion of the [MaybeSource] without blocking the thread.
+ * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error.
+ *
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this
+ * function immediately resumes with [CancellationException] and disposes of its subscription.
+ *
+ * @throws NoSuchElementException if no elements were produced by this [MaybeSource].
+ */
+public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException()
+
+/**
* Awaits for completion of the maybe without blocking a thread.
* Returns the resulting value, null if no value was produced or throws the corresponding exception if this
* maybe had produced error.
@@ -39,9 +71,20 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of
+ * a value, as opposed to throwing in such case.
+ *
+ * @suppress
*/
-@Suppress("UNCHECKED_CAST")
-public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null)
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull()")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull()
/**
* Awaits for completion of the maybe without blocking a thread.
@@ -51,25 +94,30 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
+ *
+ * ### Deprecation
+ *
+ * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for
+ * details).
+ *
+ * @suppress
*/
-public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont ->
- subscribe(object : MaybeObserver<T> {
- override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) }
- override fun onComplete() { cont.resume(default) }
- override fun onSuccess(t: T) { cont.resume(t) }
- override fun onError(error: Throwable) { cont.resumeWithException(error) }
- })
-}
+@Deprecated(
+ message = "Deprecated in favor of awaitSingleOrNull()",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default")
+) // Warning since 1.5, error in 1.6, hidden in 1.7
+public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default
// ------------------------ SingleSource ------------------------
/**
- * Awaits for completion of the single value without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this single had produced error.
+ * Awaits for completion of the single value response without blocking the thread.
+ * Returns the resulting value, or throws the corresponding exception if this response produces an error.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont ->
subscribe(object : SingleObserver<T> {
@@ -82,69 +130,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine
// ------------------------ ObservableSource ------------------------
/**
- * Awaits for the first value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or,
+ * if the observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
/**
- * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without
+ * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the
+ * corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
/**
- * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the
+ * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding
+ * exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
/**
- * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
- * thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted,
+ * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws
+ * the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*/
-public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
+public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T =
+ awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
/**
- * Awaits for the last value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the last value from the given [Observable] without blocking the thread and
+ * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
+ * @throws NoSuchElementException if the observable does not emit any value
*/
public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST)
/**
- * Awaits for the single value from the given observable without blocking a thread.
- * Returns the resulting value or throws the corresponding exception if this observable had produced error.
+ * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or,
+ * if this observable has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
- * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
- * immediately resumes with [CancellationException].
+ * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
+ * function immediately disposes of its subscription and resumes with [CancellationException].
*
- * @throws NoSuchElementException if observable does not emit any value
- * @throws IllegalArgumentException if observable emits more than one value
+ * @throws NoSuchElementException if the observable does not emit any value
+ * @throws IllegalArgumentException if the observable emits more than one value
*/
public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
@@ -218,3 +270,4 @@ private suspend fun <T> ObservableSource<T>.awaitOne(
}
})
}
+
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
index 29951598..1017b112 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt
@@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo
try {
RxJavaPlugins.onError(cause)
} catch (e: Throwable) {
+ cause.addSuppressed(e)
handleCoroutineException(context, cause)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
index 76333f2e..21238d24 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt
@@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.flow.*
/**
* Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it.
@@ -40,14 +41,18 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> {
/**
* Subscribes to this [MaybeSource] and performs the specified action for each received element.
- * Cancels subscription if any exception happens during collect.
+ *
+ * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from
+ * [collect].
*/
public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)
/**
* Subscribes to this [ObservableSource] and performs the specified action for each received element.
- * Cancels subscription if any exception happens during collect.
+ *
+ * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
+ * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
openSubscription().consumeEach(action)
@@ -69,11 +74,12 @@ private class SubscriptionChannel<T> :
}
override fun onSuccess(t: T) {
- offer(t)
+ trySend(t)
+ close(cause = null)
}
override fun onNext(t: T) {
- offer(t)
+ trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
}
override fun onComplete() {
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
index f4c5d7e7..47cc6ad3 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt
@@ -39,7 +39,7 @@ private fun rxCompletableInternal(
private class RxCompletableCoroutine(
parentContext: CoroutineContext,
private val subscriber: CompletableEmitter
-) : AbstractCoroutine<Unit>(parentContext, true) {
+) : AbstractCoroutine<Unit>(parentContext, false, true) {
override fun onCompleted(value: Unit) {
try {
subscriber.onComplete()
@@ -50,11 +50,12 @@ private class RxCompletableCoroutine(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
index 63e30f26..b4693a55 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt
@@ -26,7 +26,6 @@ import kotlin.coroutines.*
*
* @param context -- the coroutine context from which the resulting completable is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) {
this@asCompletable.join()
}
@@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet
*
* @param context -- the coroutine context from which the resulting maybe is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) {
this@asMaybe.await()
}
@@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay
*
* @param context -- the coroutine context from which the resulting single is going to be signalled
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) {
this@asSingle.await()
}
@@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T>
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) {
+ /*
+ * Channel was closed by the downstream, so the exception (if any)
+ * also was handled by the same downstream
+ */
try {
- sendBlocking(t)
- } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974
- // Is handled by the downstream flow
+ trySendBlocking(t)
+ } catch (e: InterruptedException) {
+ // RxJava interrupts the source
}
}
override fun onError(e: Throwable) { close(e) }
@@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter ->
/*
* ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if
@@ -137,10 +136,10 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
Flowable.fromPublisher(asPublisher(context))
+/** @suppress */
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
@@ -148,6 +147,7 @@ public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutin
public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> =
asFlowable(context)
+/** @suppress */
@Suppress("UNUSED") // KT-42513
@JvmOverloads // binary compatibility
@JvmName("from")
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
index 445a6140..9357f283 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt
@@ -28,7 +28,6 @@ import kotlin.coroutines.*
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
*/
-@ExperimentalCoroutinesApi
public fun <T: Any> rxFlowable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
index ca1d5b59..12d0197b 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt
@@ -40,7 +40,7 @@ private fun <T> rxMaybeInternal(
private class RxMaybeCoroutine<T>(
parentContext: CoroutineContext,
private val subscriber: MaybeEmitter<T>
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
if (value == null) subscriber.onComplete() else subscriber.onSuccess(value)
@@ -51,11 +51,12 @@ private class RxMaybeCoroutine<T>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
index 7bd07750..57007bbd 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
@@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
+import kotlinx.coroutines.internal.*
/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
@@ -29,7 +30,6 @@ import kotlin.coroutines.*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
*/
-@ExperimentalCoroutinesApi
public fun <T : Any> rxObservable(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
@@ -54,39 +54,35 @@ private const val OPEN = 0 // open channel, still working
private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet
private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError
-private class RxObservableCoroutine<T: Any>(
+private class RxObservableCoroutine<T : Any>(
parentContext: CoroutineContext,
private val subscriber: ObservableEmitter<T>
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
+) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
- // Mutex is locked when while subscriber.onXXX is being invoked
+ // Mutex is locked while subscriber.onXXX is being invoked
private val mutex = Mutex()
private val _signal = atomic(OPEN)
- override val isClosedForSend: Boolean get() = isCompleted
- override val isFull: Boolean = mutex.isLocked
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit) =
throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
- doLockedNext(element)
- return true
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (offer(element)) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -94,30 +90,39 @@ private class RxObservableCoroutine<T: Any>(
// registerSelectSend
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
- override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
+ override fun <R> registerSelectClause2(
+ select: SelectInstance<R>,
+ element: T,
+ block: suspend (SendChannel<T>) -> R
+ ) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
// assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
+ private fun doLockedNext(elem: T): Throwable? {
// check if already closed for send
if (!isActive) {
doLockedSignalCompleted(completionCause, completionCauseHandled)
- throw getCancellationException()
+ return getCancellationException()
}
// notify subscriber
try {
subscriber.onNext(elem)
} catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
- mutex.unlock()
- throw e
+ val cause = UndeliverableException(e)
+ val causeDelivered = close(cause)
+ unlockAndCheckCompleted()
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ handleUndeliverableException(cause, context)
+ getCancellationException()
+ }
}
/*
* There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might
@@ -126,6 +131,7 @@ private class RxObservableCoroutine<T: Any>(
* We have to recheck `isCompleted` after `unlock` anyway.
*/
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -139,33 +145,31 @@ private class RxObservableCoroutine<T: Any>(
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
// cancellation failures
try {
- if (_signal.value >= CLOSED) {
- _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ if (_signal.value == SIGNALLED)
+ return
+ _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = cause?.let { unwrap(it) }
+ if (unwrappedCause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.tryOnError(cause)
- if (!handled && cause.isFatal()) {
- handleUndeliverableException(cause, context)
- }
- }
- else {
- subscriber.onComplete()
- }
- } catch (e: Throwable) {
- // Unhandled exception (cannot handle in other way, since we are already complete)
+ subscriber.onComplete()
+ } catch (e: Exception) {
handleUndeliverableException(e, context)
}
+ } else if (unwrappedCause is UndeliverableException && !handled) {
+ /** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
+ * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
+ * cancelled. */
+ handleUndeliverableException(cause, context)
+ } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
+ try {
+ /** If the subscriber is already in a terminal state, the error will be signalled to
+ * `RxJavaPlugins.onError`. */
+ subscriber.onError(cause)
+ } catch (e: Exception) {
+ cause.addSuppressed(e)
+ handleUndeliverableException(cause, context)
+ }
}
} finally {
mutex.unlock()
@@ -187,9 +191,3 @@ private class RxObservableCoroutine<T: Any>(
}
}
-internal fun Throwable.isFatal() = try {
- Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode
- false
-} catch (e: Throwable) {
- true
-}
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
index f4d07fb4..e7678f0d 100644
--- a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
+++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt
@@ -39,7 +39,7 @@ private fun <T : Any> rxSingleInternal(
private class RxSingleCoroutine<T: Any>(
parentContext: CoroutineContext,
private val subscriber: SingleEmitter<T>
-) : AbstractCoroutine<T>(parentContext, true) {
+) : AbstractCoroutine<T>(parentContext, false, true) {
override fun onCompleted(value: T) {
try {
subscriber.onSuccess(value)
@@ -50,11 +50,12 @@ private class RxSingleCoroutine<T: Any>(
override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
- if (!subscriber.tryOnError(cause)) {
- handleUndeliverableException(cause, context)
+ if (subscriber.tryOnError(cause)) {
+ return
}
} catch (e: Throwable) {
- handleUndeliverableException(e, context)
+ cause.addSuppressed(e)
}
+ handleUndeliverableException(cause, context)
}
}