diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-rx3/src')
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxAwait.kt | 145 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt | 1 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxChannel.kt | 14 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt | 9 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxConvert.kt | 18 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt | 1 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt | 9 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxObservable.kt | 116 | ||||
-rw-r--r-- | reactive/kotlinx-coroutines-rx3/src/RxSingle.kt | 9 |
9 files changed, 191 insertions, 131 deletions
diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 8ac0a10c..2a14cf7c 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -15,11 +15,12 @@ import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ /** - * Awaits for completion of this completable without blocking a thread. - * Returns `Unit` or throws the corresponding exception if this completable had produced error. + * Awaits for completion of this completable without blocking the thread. + * Returns `Unit`, or throws the corresponding exception if this completable produces an error. * * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this - * suspending function is suspended, this function immediately resumes with [CancellationException]. + * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its + * subscription. */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { @@ -32,6 +33,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine // ------------------------ MaybeSource ------------------------ /** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this + * [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + */ +@Suppress("UNCHECKED_CAST") +public suspend fun <T> MaybeSource<T>.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : MaybeObserver<T> { + override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } + override fun onComplete() { cont.resume(null) } + override fun onSuccess(t: T) { cont.resume(t) } + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + * + * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. + */ +public suspend fun <T> MaybeSource<T>.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + +/** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this * maybe had produced error. @@ -39,9 +71,20 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. + * + * @suppress */ -@Suppress("UNCHECKED_CAST") -public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).awaitOrDefault(null) +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. @@ -51,25 +94,30 @@ public suspend fun <T> MaybeSource<T>.await(): T? = (this as MaybeSource<T?>).aw * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). + * + * @suppress */ -public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver<T> { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(default) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) -} +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> MaybeSource<T>.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ /** - * Awaits for completion of the single value without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this single had produced error. + * Awaits for completion of the single value response without blocking the thread. + * Returns the resulting value, or throws the corresponding exception if this response produces an error. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver<T> { @@ -82,69 +130,73 @@ public suspend fun <T> SingleSource<T>.await(): T = suspendCancellableCoroutine // ------------------------ ObservableSource ------------------------ /** - * Awaits for the first value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or, + * if the observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the + * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun <T> ObservableSource<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, + * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws + * the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() +public suspend fun <T> ObservableSource<T>.awaitFirstOrElse(defaultValue: () -> T): T = + awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the last value from the given [Observable] without blocking the thread and + * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun <T> ObservableSource<T>.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, + * if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value - * @throws IllegalArgumentException if observable emits more than one value + * @throws NoSuchElementException if the observable does not emit any value + * @throws IllegalArgumentException if the observable emits more than one value */ public suspend fun <T> ObservableSource<T>.awaitSingle(): T = awaitOne(Mode.SINGLE) @@ -218,3 +270,4 @@ private suspend fun <T> ObservableSource<T>.awaitOne( } }) } + diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt index 29951598..1017b112 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxCancellable.kt @@ -20,6 +20,7 @@ internal fun handleUndeliverableException(cause: Throwable, context: CoroutineCo try { RxJavaPlugins.onError(cause) } catch (e: Throwable) { + cause.addSuppressed(e) handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index 76333f2e..21238d24 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.flow.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. @@ -40,14 +41,18 @@ internal fun <T> ObservableSource<T>.openSubscription(): ReceiveChannel<T> { /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from + * [collect]. */ public suspend inline fun <T> MaybeSource<T>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) @@ -69,11 +74,12 @@ private class SubscriptionChannel<T> : } override fun onSuccess(t: T) { - offer(t) + trySend(t) + close(cause = null) } override fun onNext(t: T) { - offer(t) + trySend(t) // Safe to ignore return value here, expectedly racing with cancellation } override fun onComplete() { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt index f4c5d7e7..47cc6ad3 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt @@ -39,7 +39,7 @@ private fun rxCompletableInternal( private class RxCompletableCoroutine( parentContext: CoroutineContext, private val subscriber: CompletableEmitter -) : AbstractCoroutine<Unit>(parentContext, true) { +) : AbstractCoroutine<Unit>(parentContext, false, true) { override fun onCompleted(value: Unit) { try { subscriber.onComplete() @@ -50,11 +50,12 @@ private class RxCompletableCoroutine( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt index 63e30f26..b4693a55 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxConvert.kt @@ -26,7 +26,6 @@ import kotlin.coroutines.* * * @param context -- the coroutine context from which the resulting completable is going to be signalled */ -@ExperimentalCoroutinesApi public fun Job.asCompletable(context: CoroutineContext): Completable = rxCompletable(context) { this@asCompletable.join() } @@ -43,7 +42,6 @@ public fun Job.asCompletable(context: CoroutineContext): Completable = rxComplet * * @param context -- the coroutine context from which the resulting maybe is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMaybe(context) { this@asMaybe.await() } @@ -60,7 +58,6 @@ public fun <T> Deferred<T?>.asMaybe(context: CoroutineContext): Maybe<T> = rxMay * * @param context -- the coroutine context from which the resulting single is going to be signalled */ -@ExperimentalCoroutinesApi public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> = rxSingle(context) { this@asSingle.await() } @@ -75,17 +72,20 @@ public fun <T : Any> Deferred<T>.asSingle(context: CoroutineContext): Single<T> * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ -@ExperimentalCoroutinesApi public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { val disposableRef = AtomicReference<Disposable>() val observer = object : Observer<T> { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { + /* + * Channel was closed by the downstream, so the exception (if any) + * also was handled by the same downstream + */ try { - sendBlocking(t) - } catch (ignored: Throwable) { // TODO: Replace when this issue is fixed: https://github.com/Kotlin/kotlinx.coroutines/issues/974 - // Is handled by the downstream flow + trySendBlocking(t) + } catch (e: InterruptedException) { + // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } @@ -104,7 +104,6 @@ public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow { * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable<T> = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if @@ -137,10 +136,10 @@ public fun <T: Any> Flow<T>.asObservable(context: CoroutineContext = EmptyCorout * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ -@ExperimentalCoroutinesApi public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = Flowable.fromPublisher(asPublisher(context)) +/** @suppress */ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") @@ -148,6 +147,7 @@ public fun <T: Any> Flow<T>.asFlowable(context: CoroutineContext = EmptyCoroutin public fun <T: Any> Flow<T>._asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable<T> = asFlowable(context) +/** @suppress */ @Suppress("UNUSED") // KT-42513 @JvmOverloads // binary compatibility @JvmName("from") diff --git a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt index 445a6140..9357f283 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxFlowable.kt @@ -28,7 +28,6 @@ import kotlin.coroutines.* * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect */ -@ExperimentalCoroutinesApi public fun <T: Any> rxFlowable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt index ca1d5b59..12d0197b 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt @@ -40,7 +40,7 @@ private fun <T> rxMaybeInternal( private class RxMaybeCoroutine<T>( parentContext: CoroutineContext, private val subscriber: MaybeEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) @@ -51,11 +51,12 @@ private class RxMaybeCoroutine<T>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 7bd07750..57007bbd 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* +import kotlinx.coroutines.internal.* /** * Creates cold [observable][Observable] that will run a given [block] in a coroutine. @@ -29,7 +30,6 @@ import kotlin.coroutines.* * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T : Any> rxObservable( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -54,39 +54,35 @@ private const val OPEN = 0 // open channel, still working private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError -private class RxObservableCoroutine<T: Any>( +private class RxObservableCoroutine<T : Any>( parentContext: CoroutineContext, private val subscriber: ObservableEmitter<T> -) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { +) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, SelectClause2<T, SendChannel<T>> { override val channel: SendChannel<T> get() = this - // Mutex is locked when while subscriber.onXXX is being invoked + // Mutex is locked while subscriber.onXXX is being invoked private val mutex = Mutex() private val _signal = atomic(OPEN) - override val isClosedForSend: Boolean get() = isCompleted - override val isFull: Boolean = mutex.isLocked + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit) = throw UnsupportedOperationException("RxObservableCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false - doLockedNext(element) - return true - } + override fun trySend(element: T): ChannelResult<Unit> = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (offer(element)) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2<T, SendChannel<T>> @@ -94,30 +90,39 @@ private class RxObservableCoroutine<T: Any>( // registerSelectSend @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) { + override fun <R> registerSelectClause2( + select: SelectInstance<R>, + element: T, + block: suspend (SendChannel<T>) -> R + ) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { + private fun doLockedNext(elem: T): Throwable? { // check if already closed for send if (!isActive) { doLockedSignalCompleted(completionCause, completionCauseHandled) - throw getCancellationException() + return getCancellationException() } // notify subscriber try { subscriber.onNext(elem) } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) - mutex.unlock() - throw e + val cause = UndeliverableException(e) + val causeDelivered = close(cause) + unlockAndCheckCompleted() + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + handleUndeliverableException(cause, context) + getCancellationException() + } } /* * There is no sense to check for `isActive` before doing `unlock`, because cancellation/completion might @@ -126,6 +131,7 @@ private class RxObservableCoroutine<T: Any>( * We have to recheck `isCompleted` after `unlock` anyway. */ unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -139,33 +145,31 @@ private class RxObservableCoroutine<T: Any>( private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { // cancellation failures try { - if (_signal.value >= CLOSED) { - _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + if (_signal.value == SIGNALLED) + return + _signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = cause?.let { unwrap(it) } + if (unwrappedCause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.tryOnError(cause) - if (!handled && cause.isFatal()) { - handleUndeliverableException(cause, context) - } - } - else { - subscriber.onComplete() - } - } catch (e: Throwable) { - // Unhandled exception (cannot handle in other way, since we are already complete) + subscriber.onComplete() + } catch (e: Exception) { handleUndeliverableException(e, context) } + } else if (unwrappedCause is UndeliverableException && !handled) { + /** Such exceptions are not reported to `onError`, as, according to the reactive specifications, + * exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already + * cancelled. */ + handleUndeliverableException(cause, context) + } else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) { + try { + /** If the subscriber is already in a terminal state, the error will be signalled to + * `RxJavaPlugins.onError`. */ + subscriber.onError(cause) + } catch (e: Exception) { + cause.addSuppressed(e) + handleUndeliverableException(cause, context) + } } } finally { mutex.unlock() @@ -187,9 +191,3 @@ private class RxObservableCoroutine<T: Any>( } } -internal fun Throwable.isFatal() = try { - Exceptions.throwIfFatal(this) // Rx-consistent behaviour without hardcode - false -} catch (e: Throwable) { - true -} diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt index f4d07fb4..e7678f0d 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt @@ -39,7 +39,7 @@ private fun <T : Any> rxSingleInternal( private class RxSingleCoroutine<T: Any>( parentContext: CoroutineContext, private val subscriber: SingleEmitter<T> -) : AbstractCoroutine<T>(parentContext, true) { +) : AbstractCoroutine<T>(parentContext, false, true) { override fun onCompleted(value: T) { try { subscriber.onSuccess(value) @@ -50,11 +50,12 @@ private class RxSingleCoroutine<T: Any>( override fun onCancelled(cause: Throwable, handled: Boolean) { try { - if (!subscriber.tryOnError(cause)) { - handleUndeliverableException(cause, context) + if (subscriber.tryOnError(cause)) { + return } } catch (e: Throwable) { - handleUndeliverableException(e, context) + cause.addSuppressed(e) } + handleUndeliverableException(cause, context) } } |