diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/Await.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/Await.kt | 241 |
1 files changed, 179 insertions, 62 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index e9f69550..fef1205a 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -4,120 +4,170 @@ package kotlinx.coroutines.reactive -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.Job -import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.* import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription -import java.util.* +import java.lang.IllegalStateException import kotlin.coroutines.* /** - * Awaits for the first value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if + * the publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value + * @throws NoSuchElementException if the publisher does not emit any value */ public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking + * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread, + * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the last value from the given publisher without blocking the thread and + * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value + * @throws NoSuchElementException if the publisher does not emit any value */ public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, + * if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * @throws NoSuchElementException if the publisher does not emit any value + * @throws IllegalArgumentException if the publisher emits more than one value */ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE) /** - * Awaits for the single value from the given publisher or the [default] value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher, or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * ### Deprecation + * + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would + * also mean that this method would return the default value if there are *too many* values. This could be confusing to + * those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. + * + * @throws NoSuchElementException if the publisher does not emit any value + * @throws IllegalArgumentException if the publisher emits more than one value + * + * @suppress */ +@Deprecated( + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "Please consider using awaitFirstOrDefault().", + level = DeprecationLevel.WARNING +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default) /** - * Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, if + * this publisher has produced an error, throws the corresponding exception. If more than one value or none were + * produced by the publisher, `null` is returned. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * ### Deprecation * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would + * also mean that this method would return `null` if there are *too many* values. This could be confusing to + * those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. + * + * @throws IllegalArgumentException if the publisher emits more than one value + * @suppress */ -public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT) +@Deprecated( + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "There is a specialized version for Reactor's Mono, please use that where applicable. " + + "Alternatively, please consider using awaitFirstOrNull().", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT) /** - * Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * ### Deprecation + * + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would + * also mean that this method would return the calculated value if there are *too many* values. This could be confusing + * to those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * @throws IllegalArgumentException if the publisher emits more than one value + * @suppress */ -public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue() +@Deprecated( + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "Please consider using awaitFirstOrElse().", + level = DeprecationLevel.WARNING +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T = + awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue() // ------------------------ private ------------------------ @@ -134,31 +184,61 @@ private suspend fun <T> Publisher<T>.awaitOne( mode: Mode, default: T? = null ): T = suspendCancellableCoroutine { cont -> + /* This implementation must obey + https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code + The numbers of rules are taken from there. */ injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> { - private lateinit var subscription: Subscription + // It is unclear whether 2.13 implies (T: Any), but if so, it seems that we don't break anything by not adhering + private var subscription: Subscription? = null private var value: T? = null private var seenValue = false + private var inTerminalState = false override fun onSubscribe(sub: Subscription) { + /** cancelling the new subscription due to rule 2.5, though the publisher would either have to + * subscribe more than once, which would break 2.12, or leak this [Subscriber]. */ + if (subscription != null) { + sub.cancel() + return + } subscription = sub cont.invokeOnCancellation { sub.cancel() } - sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE) + sub.request(if (mode == Mode.FIRST || mode == Mode.FIRST_OR_DEFAULT) 1 else Long.MAX_VALUE) } override fun onNext(t: T) { + val sub = subscription.let { + if (it == null) { + /** Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. */ + handleCoroutineException(cont.context, + IllegalStateException("'onNext' was called before 'onSubscribe'")) + return + } else { + it + } + } + if (inTerminalState) { + gotSignalInTerminalStateException(cont.context, "onNext") + return + } when (mode) { Mode.FIRST, Mode.FIRST_OR_DEFAULT -> { - if (!seenValue) { - seenValue = true - subscription.cancel() - cont.resume(t) + if (seenValue) { + moreThanOneValueProvidedException(cont.context, mode) + return } + seenValue = true + sub.cancel() + cont.resume(t) } Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> { if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) { - subscription.cancel() - if (cont.isActive) + sub.cancel() + /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or + `onError` on its own. */ + if (cont.isActive) { cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode")) + } } else { value = t seenValue = true @@ -169,8 +249,16 @@ private suspend fun <T> Publisher<T>.awaitOne( @Suppress("UNCHECKED_CAST") override fun onComplete() { + if (!tryEnterTerminalState("onComplete")) { + return + } if (seenValue) { - if (cont.isActive) cont.resume(value as T) + /* the check for `cont.isActive` is needed because, otherwise, if the publisher doesn't acknowledge the + call to `cancel` for modes `SINGLE*` when more than one value was seen, it may call `onComplete`, and + here `cont.resume` would fail. */ + if (mode != Mode.FIRST_OR_DEFAULT && mode != Mode.FIRST && cont.isActive) { + cont.resume(value as T) + } return } when { @@ -178,14 +266,43 @@ private suspend fun <T> Publisher<T>.awaitOne( cont.resume(default as T) } cont.isActive -> { + // the check for `cont.isActive` is just a slight optimization and doesn't affect correctness cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode")) } } } override fun onError(e: Throwable) { - cont.resumeWithException(e) + if (tryEnterTerminalState("onError")) { + cont.resumeWithException(e) + } + } + + /** + * Enforce rule 2.4: assume that the [Publisher] is in a terminal state after [onError] or [onComplete]. + */ + private fun tryEnterTerminalState(signalName: String): Boolean { + if (inTerminalState) { + gotSignalInTerminalStateException(cont.context, signalName) + return false + } + inTerminalState = true + return true } }) } +/** + * Enforce rule 2.4 (detect publishers that don't respect rule 1.7): don't process anything after a terminal + * state was reached. + */ +private fun gotSignalInTerminalStateException(context: CoroutineContext, signalName: String) = + handleCoroutineException(context, + IllegalStateException("'$signalName' was called after the publisher already signalled being in a terminal state")) + +/** + * Enforce rule 1.1: it is invalid for a publisher to provide more values than requested. + */ +private fun moreThanOneValueProvidedException(context: CoroutineContext, mode: Mode) = + handleCoroutineException(context, + IllegalStateException("Only a single value was requested in '$mode', but the publisher provided more")) |