diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactor/src/Flux.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactor/src/Flux.kt | 86 |
1 files changed, 51 insertions, 35 deletions
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 8d4f9cc9..1e408d83 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -1,10 +1,7 @@ - /* * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ -@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - package kotlinx.coroutines.reactor import kotlinx.coroutines.* @@ -15,25 +12,23 @@ import reactor.core.* import reactor.core.publisher.* import reactor.util.context.* import kotlin.coroutines.* -import kotlin.internal.* /** - * Creates cold reactive [Flux] that runs a given [block] in a coroutine. + * Creates a cold reactive [Flux] that runs the given [block] in a coroutine. * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) - * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) - * if coroutine throws an exception or closes channel with a cause. - * Unsubscribing cancels running coroutine. + * The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete]) + * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed, + * emits the error ([Subscriber.onError]) and closes the channel with the cause. + * Unsubscribing cancels the running coroutine. * - * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that - * `onNext` is not invoked concurrently. - * - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to + * ensure that [onNext][Subscriber.onNext] is not invoked concurrently. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. + * + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi public fun <T> flux( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit @@ -43,40 +38,61 @@ public fun <T> flux( return Flux.from(reactorPublish(GlobalScope, context, block)) } -@Deprecated( - message = "CoroutineScope.flux is deprecated in favour of top-level flux", - level = DeprecationLevel.ERROR, - replaceWith = ReplaceWith("flux(context, block)") -) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring -@LowPriorityInOverloadResolution -public fun <T> CoroutineScope.flux( - context: CoroutineContext = EmptyCoroutineContext, - @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Flux<T> = - Flux.from(reactorPublish(this, context, block)) - private fun <T> reactorPublish( scope: CoroutineScope, context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope<T>.() -> Unit -): Publisher<T> = Publisher { subscriber -> - // specification requires NPE on null subscriber - if (subscriber == null) throw NullPointerException("Subscriber cannot be null") - require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." } +): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? -> + if (subscriber !is CoreSubscriber) { + subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted.")) + return@onSubscribe + } val currentContext = subscriber.currentContext() - val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext() + val reactorContext = context.extendReactorContext(currentContext) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions coroutine.start(CoroutineStart.DEFAULT, coroutine, block) } -private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx -> - if (e !is CancellationException) { +private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ctx -> + if (cause !is CancellationException) { try { - Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty()) + Operators.onOperatorError(cause, ctx[ReactorContext]?.context ?: Context.empty()) } catch (e: Throwable) { - handleCoroutineException(ctx, e) + cause.addSuppressed(e) + handleCoroutineException(ctx, cause) } } } + +/** The proper way to reject the subscriber, according to + * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9) + */ +private fun <T> Subscriber<T>?.reject(t: Throwable) { + if (this == null) + throw NullPointerException("The subscriber can not be null") + onSubscribe(object: Subscription { + override fun request(n: Long) { + // intentionally left blank + } + override fun cancel() { + // intentionally left blank + } + }) + onError(t) +} + +/** + * @suppress + */ +@Deprecated( + message = "CoroutineScope.flux is deprecated in favour of top-level flux", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("flux(context, block)") +) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring +public fun <T> CoroutineScope.flux( + context: CoroutineContext = EmptyCoroutineContext, + @BuilderInference block: suspend ProducerScope<T>.() -> Unit +): Flux<T> = + Flux.from(reactorPublish(this, context, block)) |