diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/Publish.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/Publish.kt | 205 |
1 files changed, 115 insertions, 90 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 35878b04..4928a743 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -1,9 +1,6 @@ /* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ - -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.reactive import kotlinx.atomicfu.* @@ -13,27 +10,28 @@ import kotlinx.coroutines.selects.* import kotlinx.coroutines.sync.* import org.reactivestreams.* import kotlin.coroutines.* -import kotlin.internal.LowPriorityInOverloadResolution /** - * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. + * Creates a cold reactive [Publisher] that runs a given [block] in a coroutine. + * * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) - * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) - * if coroutine throws an exception or closes channel with a cause. - * Unsubscribing cancels running coroutine. + * The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send], + * completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits + * errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels the running coroutine. * - * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that - * `onNext` is not invoked concurrently. + * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to + * ensure that [onNext][Subscriber.onNext] is not invoked concurrently. * * Coroutine context can be specified with [context] argument. - * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is + * used. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. + * + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T> publish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -43,17 +41,6 @@ public fun <T> publish( return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block) } -@Deprecated( - message = "CoroutineScope.publish is deprecated in favour of top-level publish", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("publish(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring -@LowPriorityInOverloadResolution -public fun <T> CoroutineScope.publish( - context: CoroutineContext = EmptyCoroutineContext, - @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block) - /** @suppress For internal use from other reactive integration modules only */ @InternalCoroutinesApi public fun <T> publishInternal( @@ -74,13 +61,14 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/ private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) } +/** @suppress */ @Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE") @InternalCoroutinesApi public class PublisherCoroutine<in T>( parentContext: CoroutineContext, private val subscriber: Subscriber<T>, private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit -) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> { +) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> { override val channel: SendChannel<T> get() = this // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked @@ -88,30 +76,26 @@ public class PublisherCoroutine<in T>( private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED) @Volatile - private var cancelled = false // true when Subscription.cancel() is invoked + private var cancelled = false // true after Subscription.cancel() is invoked - override val isClosedForSend: Boolean get() = isCompleted - override val isFull: Boolean = mutex.isLocked + override val isClosedForSend: Boolean get() = !isActive override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause) override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing = throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose") - override fun offer(element: T): Boolean { - if (!mutex.tryLock()) return false - doLockedNext(element) - return true - } + override fun trySend(element: T): ChannelResult<Unit> = + if (!mutex.tryLock()) { + ChannelResult.failure() + } else { + when (val throwable = doLockedNext(element)) { + null -> ChannelResult.success(Unit) + else -> ChannelResult.closed(throwable) + } + } public override suspend fun send(element: T) { - // fast-path -- try send without suspension - if (offer(element)) return - // slow-path does suspend - return sendSuspend(element) - } - - private suspend fun sendSuspend(element: T) { mutex.lock() - doLockedNext(element) + doLockedNext(element)?.let { throw it } } override val onSend: SelectClause2<T, SendChannel<T>> @@ -121,13 +105,13 @@ public class PublisherCoroutine<in T>( @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) { mutex.onLock.registerSelectClause2(select, null) { - doLockedNext(element) + doLockedNext(element)?.let { throw it } block(this) } } /* - * This code is not trivial because of the two properties: + * This code is not trivial because of the following properties: * 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not * be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple * coroutines are invoking `send` function. @@ -136,27 +120,61 @@ public class PublisherCoroutine<in T>( * globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may * lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for * `onComplete/onError` is also done under the same mutex. + * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the + * subscriber actually requests some elements. This is implemented by the mutex being locked when emitting + * elements is not permitted (`_nRequested.value == 0`). */ - // assert: mutex.isLocked() - private fun doLockedNext(elem: T) { - // check if already closed for send, note that isActive becomes false as soon as cancel() is invoked, - // because the job is cancelled, so this check also ensure conformance to the reactive specification's - // requirement that after cancellation requested we don't call onXXX + /** + * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex. + * + * Requires that the caller has locked the mutex before this invocation. + * + * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success. + * + * @throws NullPointerException if the passed element is `null` + */ + private fun doLockedNext(elem: T): Throwable? { + if (elem == null) { + unlockAndCheckCompleted() + throw NullPointerException("Attempted to emit `null` inside a reactive publisher") + } + /** This guards against the case when the caller of this function managed to lock the mutex not because some + * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed. + * + * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's + * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually* + * stop signalling the subscriber. */ if (!isActive) { unlockAndCheckCompleted() - throw getCancellationException() + return getCancellationException() } - // notify subscriber + // notify the subscriber try { subscriber.onNext(elem) - } catch (e: Throwable) { - // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it - // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery, - // this failure is essentially equivalent to a failure of a child coroutine. - cancelCoroutine(e) + } catch (cause: Throwable) { + /** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the + * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler + * for uncaught exceptions and consider the subscription cancelled, as mandated by + * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13. + * + * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the + * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or + * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is + * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as + * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately + * would violate the contracts we place on them. */ + cancelled = true + val causeDelivered = close(cause) unlockAndCheckCompleted() - throw e + return if (causeDelivered) { + // `cause` is the reason this channel is closed + cause + } else { + // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception. + exceptionOnCancelHandler(cause, context) + getCancellationException() + } } // now update nRequested while (true) { // lock-free loop on nRequested @@ -167,12 +185,13 @@ public class PublisherCoroutine<in T>( if (_nRequested.compareAndSet(current, updated)) { if (updated == 0L) { // return to keep locked due to back-pressure - return + return null } break // unlock if updated > 0 } } unlockAndCheckCompleted() + return null } private fun unlockAndCheckCompleted() { @@ -192,38 +211,31 @@ public class PublisherCoroutine<in T>( // assert: mutex.isLocked() & isCompleted private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) { try { - if (_nRequested.value >= CLOSED) { - _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed) - // Specification requires that after cancellation requested we don't call onXXX - if (cancelled) { - // If the parent had failed to handle our exception, then we must not lose this exception - if (cause != null && !handled) exceptionOnCancelHandler(cause, context) - return - } - + if (_nRequested.value == SIGNALLED) + return + _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed) + // Specification requires that after the cancellation is requested we eventually stop calling onXXX + if (cancelled) { + // If the parent failed to handle this exception, then we must not lose the exception + if (cause != null && !handled) exceptionOnCancelHandler(cause, context) + return + } + if (cause == null) { try { - if (cause != null && cause !is CancellationException) { - /* - * Reactive frameworks have two types of exceptions: regular and fatal. - * Regular are passed to onError. - * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297). - * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether - * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was - * thrown by subscriber or upstream). - * To make behaviour consistent and least surprising, we always handle fatal exceptions - * by coroutines machinery, anyway, they should not be present in regular program flow, - * thus our goal here is just to expose it as soon as possible. - */ - subscriber.onError(cause) - if (!handled && cause.isFatal()) { - exceptionOnCancelHandler(cause, context) - } - } else { - subscriber.onComplete() - } + subscriber.onComplete() } catch (e: Throwable) { handleCoroutineException(context, e) } + } else { + try { + // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`. + subscriber.onError(cause) + } catch (e: Throwable) { + if (e !== cause) { + cause.addSuppressed(e) + } + handleCoroutineException(context, cause) + } } } finally { mutex.unlock() @@ -232,13 +244,13 @@ public class PublisherCoroutine<in T>( override fun request(n: Long) { if (n <= 0) { - // Specification requires IAE for n <= 0 + // Specification requires to call onError with IAE for n <= 0 cancelCoroutine(IllegalArgumentException("non-positive subscription request $n")) return } while (true) { // lock-free loop for nRequested val cur = _nRequested.value - if (cur < 0) return // already closed for send, ignore requests + if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec var upd = cur + n if (upd < 0 || n == Long.MAX_VALUE) upd = Long.MAX_VALUE @@ -246,6 +258,11 @@ public class PublisherCoroutine<in T>( if (_nRequested.compareAndSet(cur, upd)) { // unlock the mutex when we don't have back-pressure anymore if (cur == 0L) { + /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns + * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the + * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is + * impossible for any other operations to assume that they own the lock without actually acquiring + * it. */ unlockAndCheckCompleted() } return @@ -286,6 +303,14 @@ public class PublisherCoroutine<in T>( cancelled = true super.cancel(null) } - - private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError } + +@Deprecated( + message = "CoroutineScope.publish is deprecated in favour of top-level publish", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("publish(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring +public fun <T> CoroutineScope.publish( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope<T>.() -> Unit +): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER, block) |