aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/src/Publish.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/Publish.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/Publish.kt205
1 files changed, 115 insertions, 90 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
index 35878b04..4928a743 100644
--- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt
@@ -1,9 +1,6 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-
-@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
-
package kotlinx.coroutines.reactive
import kotlinx.atomicfu.*
@@ -13,27 +10,28 @@ import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import org.reactivestreams.*
import kotlin.coroutines.*
-import kotlin.internal.LowPriorityInOverloadResolution
/**
- * Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
+ * Creates a cold reactive [Publisher] that runs a given [block] in a coroutine.
+ *
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
- * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
- * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
- * if coroutine throws an exception or closes channel with a cause.
- * Unsubscribing cancels running coroutine.
+ * The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send],
+ * completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
+ * errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
+ * Unsubscribing cancels the running coroutine.
*
- * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
- * `onNext` is not invoked concurrently.
+ * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
+ * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
*
* Coroutine context can be specified with [context] argument.
- * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
- * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
+ * used.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
+ *
+ * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
-@ExperimentalCoroutinesApi
public fun <T> publish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
@@ -43,17 +41,6 @@ public fun <T> publish(
return publishInternal(GlobalScope, context, DEFAULT_HANDLER, block)
}
-@Deprecated(
- message = "CoroutineScope.publish is deprecated in favour of top-level publish",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("publish(context, block)")
-) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
-@LowPriorityInOverloadResolution
-public fun <T> CoroutineScope.publish(
- context: CoroutineContext = EmptyCoroutineContext,
- @BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER ,block)
-
/** @suppress For internal use from other reactive integration modules only */
@InternalCoroutinesApi
public fun <T> publishInternal(
@@ -74,13 +61,14 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError
private val DEFAULT_HANDLER: (Throwable, CoroutineContext) -> Unit = { t, ctx -> if (t !is CancellationException) handleCoroutineException(ctx, t) }
+/** @suppress */
@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
@InternalCoroutinesApi
public class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>,
private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit
-) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
+) : AbstractCoroutine<Unit>(parentContext, false, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
override val channel: SendChannel<T> get() = this
// Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked
@@ -88,30 +76,26 @@ public class PublisherCoroutine<in T>(
private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED)
@Volatile
- private var cancelled = false // true when Subscription.cancel() is invoked
+ private var cancelled = false // true after Subscription.cancel() is invoked
- override val isClosedForSend: Boolean get() = isCompleted
- override val isFull: Boolean = mutex.isLocked
+ override val isClosedForSend: Boolean get() = !isActive
override fun close(cause: Throwable?): Boolean = cancelCoroutine(cause)
override fun invokeOnClose(handler: (Throwable?) -> Unit): Nothing =
throw UnsupportedOperationException("PublisherCoroutine doesn't support invokeOnClose")
- override fun offer(element: T): Boolean {
- if (!mutex.tryLock()) return false
- doLockedNext(element)
- return true
- }
+ override fun trySend(element: T): ChannelResult<Unit> =
+ if (!mutex.tryLock()) {
+ ChannelResult.failure()
+ } else {
+ when (val throwable = doLockedNext(element)) {
+ null -> ChannelResult.success(Unit)
+ else -> ChannelResult.closed(throwable)
+ }
+ }
public override suspend fun send(element: T) {
- // fast-path -- try send without suspension
- if (offer(element)) return
- // slow-path does suspend
- return sendSuspend(element)
- }
-
- private suspend fun sendSuspend(element: T) {
mutex.lock()
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
}
override val onSend: SelectClause2<T, SendChannel<T>>
@@ -121,13 +105,13 @@ public class PublisherCoroutine<in T>(
@Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE")
override fun <R> registerSelectClause2(select: SelectInstance<R>, element: T, block: suspend (SendChannel<T>) -> R) {
mutex.onLock.registerSelectClause2(select, null) {
- doLockedNext(element)
+ doLockedNext(element)?.let { throw it }
block(this)
}
}
/*
- * This code is not trivial because of the two properties:
+ * This code is not trivial because of the following properties:
* 1. It ensures conformance to the reactive specification that mandates that onXXX invocations should not
* be concurrent. It uses Mutex to protect all onXXX invocation and ensure conformance even when multiple
* coroutines are invoking `send` function.
@@ -136,27 +120,61 @@ public class PublisherCoroutine<in T>(
* globally-scoped coroutine that is invoking `send` outside of this context. Without extra precaution this may
* lead to `onNext` that is concurrent with `onComplete/onError`, so that is why signalling for
* `onComplete/onError` is also done under the same mutex.
+ * 3. The reactive specification forbids emitting more elements than requested, so `onNext` is forbidden until the
+ * subscriber actually requests some elements. This is implemented by the mutex being locked when emitting
+ * elements is not permitted (`_nRequested.value == 0`).
*/
- // assert: mutex.isLocked()
- private fun doLockedNext(elem: T) {
- // check if already closed for send, note that isActive becomes false as soon as cancel() is invoked,
- // because the job is cancelled, so this check also ensure conformance to the reactive specification's
- // requirement that after cancellation requested we don't call onXXX
+ /**
+ * Attempts to emit a value to the subscriber and, if back-pressure permits this, unlock the mutex.
+ *
+ * Requires that the caller has locked the mutex before this invocation.
+ *
+ * If the channel is closed, returns the corresponding [Throwable]; otherwise, returns `null` to denote success.
+ *
+ * @throws NullPointerException if the passed element is `null`
+ */
+ private fun doLockedNext(elem: T): Throwable? {
+ if (elem == null) {
+ unlockAndCheckCompleted()
+ throw NullPointerException("Attempted to emit `null` inside a reactive publisher")
+ }
+ /** This guards against the case when the caller of this function managed to lock the mutex not because some
+ * elements were requested--and thus it is permitted to call `onNext`--but because the channel was closed.
+ *
+ * It may look like there is a race condition here between `isActive` and a concurrent cancellation, but it's
+ * okay for a cancellation to happen during `onNext`, as the reactive spec only requires that we *eventually*
+ * stop signalling the subscriber. */
if (!isActive) {
unlockAndCheckCompleted()
- throw getCancellationException()
+ return getCancellationException()
}
- // notify subscriber
+ // notify the subscriber
try {
subscriber.onNext(elem)
- } catch (e: Throwable) {
- // If onNext fails with exception, then we cancel coroutine (with this exception) and then rethrow it
- // to abort the corresponding send/offer invocation. From the standpoint of coroutines machinery,
- // this failure is essentially equivalent to a failure of a child coroutine.
- cancelCoroutine(e)
+ } catch (cause: Throwable) {
+ /** The reactive streams spec forbids the subscribers from throwing from [Subscriber.onNext] unless the
+ * element is `null`, which we check not to be the case. Therefore, we report this exception to the handler
+ * for uncaught exceptions and consider the subscription cancelled, as mandated by
+ * https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.13.
+ *
+ * Some reactive implementations, like RxJava or Reactor, are known to throw from [Subscriber.onNext] if the
+ * execution encounters an exception they consider to be "fatal", like [VirtualMachineError] or
+ * [ThreadDeath]. Us using the handler for the undeliverable exceptions to signal "fatal" exceptions is
+ * inconsistent with RxJava and Reactor, which attempt to bubble the exception up the call chain as soon as
+ * possible. However, we can't do much better here, as simply throwing from all methods indiscriminately
+ * would violate the contracts we place on them. */
+ cancelled = true
+ val causeDelivered = close(cause)
unlockAndCheckCompleted()
- throw e
+ return if (causeDelivered) {
+ // `cause` is the reason this channel is closed
+ cause
+ } else {
+ // Someone else closed the channel during `onNext`. We report `cause` as an undeliverable exception.
+ exceptionOnCancelHandler(cause, context)
+ getCancellationException()
+ }
}
// now update nRequested
while (true) { // lock-free loop on nRequested
@@ -167,12 +185,13 @@ public class PublisherCoroutine<in T>(
if (_nRequested.compareAndSet(current, updated)) {
if (updated == 0L) {
// return to keep locked due to back-pressure
- return
+ return null
}
break // unlock if updated > 0
}
}
unlockAndCheckCompleted()
+ return null
}
private fun unlockAndCheckCompleted() {
@@ -192,38 +211,31 @@ public class PublisherCoroutine<in T>(
// assert: mutex.isLocked() & isCompleted
private fun doLockedSignalCompleted(cause: Throwable?, handled: Boolean) {
try {
- if (_nRequested.value >= CLOSED) {
- _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
- // Specification requires that after cancellation requested we don't call onXXX
- if (cancelled) {
- // If the parent had failed to handle our exception, then we must not lose this exception
- if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
- return
- }
-
+ if (_nRequested.value == SIGNALLED)
+ return
+ _nRequested.value = SIGNALLED // we'll signal onError/onCompleted (the final state, so no CAS needed)
+ // Specification requires that after the cancellation is requested we eventually stop calling onXXX
+ if (cancelled) {
+ // If the parent failed to handle this exception, then we must not lose the exception
+ if (cause != null && !handled) exceptionOnCancelHandler(cause, context)
+ return
+ }
+ if (cause == null) {
try {
- if (cause != null && cause !is CancellationException) {
- /*
- * Reactive frameworks have two types of exceptions: regular and fatal.
- * Regular are passed to onError.
- * Fatal can be passed to onError, but even the standard implementations **can just swallow it** (e.g. see #1297).
- * Such behaviour is inconsistent, leads to silent failures and we can't possibly know whether
- * the cause will be handled by onError (and moreover, it depends on whether a fatal exception was
- * thrown by subscriber or upstream).
- * To make behaviour consistent and least surprising, we always handle fatal exceptions
- * by coroutines machinery, anyway, they should not be present in regular program flow,
- * thus our goal here is just to expose it as soon as possible.
- */
- subscriber.onError(cause)
- if (!handled && cause.isFatal()) {
- exceptionOnCancelHandler(cause, context)
- }
- } else {
- subscriber.onComplete()
- }
+ subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(context, e)
}
+ } else {
+ try {
+ // This can't be the cancellation exception from `cancel`, as then `cancelled` would be `true`.
+ subscriber.onError(cause)
+ } catch (e: Throwable) {
+ if (e !== cause) {
+ cause.addSuppressed(e)
+ }
+ handleCoroutineException(context, cause)
+ }
}
} finally {
mutex.unlock()
@@ -232,13 +244,13 @@ public class PublisherCoroutine<in T>(
override fun request(n: Long) {
if (n <= 0) {
- // Specification requires IAE for n <= 0
+ // Specification requires to call onError with IAE for n <= 0
cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
return
}
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
- if (cur < 0) return // already closed for send, ignore requests
+ if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
var upd = cur + n
if (upd < 0 || n == Long.MAX_VALUE)
upd = Long.MAX_VALUE
@@ -246,6 +258,11 @@ public class PublisherCoroutine<in T>(
if (_nRequested.compareAndSet(cur, upd)) {
// unlock the mutex when we don't have back-pressure anymore
if (cur == 0L) {
+ /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
+ * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
+ * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
+ * impossible for any other operations to assume that they own the lock without actually acquiring
+ * it. */
unlockAndCheckCompleted()
}
return
@@ -286,6 +303,14 @@ public class PublisherCoroutine<in T>(
cancelled = true
super.cancel(null)
}
-
- private fun Throwable.isFatal() = this is VirtualMachineError || this is ThreadDeath || this is LinkageError
}
+
+@Deprecated(
+ message = "CoroutineScope.publish is deprecated in favour of top-level publish",
+ level = DeprecationLevel.HIDDEN,
+ replaceWith = ReplaceWith("publish(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
+public fun <T> CoroutineScope.publish(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Publisher<T> = publishInternal(this, context, DEFAULT_HANDLER, block)