aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactor/src/Flux.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactor/src/Flux.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/Flux.kt86
1 files changed, 51 insertions, 35 deletions
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
index 8d4f9cc9..1e408d83 100644
--- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -1,10 +1,7 @@
-
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
-@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
-
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
@@ -15,25 +12,23 @@ import reactor.core.*
import reactor.core.publisher.*
import reactor.util.context.*
import kotlin.coroutines.*
-import kotlin.internal.*
/**
- * Creates cold reactive [Flux] that runs a given [block] in a coroutine.
+ * Creates a cold reactive [Flux] that runs the given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
- * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
- * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
- * if coroutine throws an exception or closes channel with a cause.
- * Unsubscribing cancels running coroutine.
+ * The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete])
+ * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed,
+ * emits the error ([Subscriber.onError]) and closes the channel with the cause.
+ * Unsubscribing cancels the running coroutine.
*
- * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
- * `onNext` is not invoked concurrently.
- *
- * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
+ * ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
+ *
+ * @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
-@ExperimentalCoroutinesApi
public fun <T> flux(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
@@ -43,40 +38,61 @@ public fun <T> flux(
return Flux.from(reactorPublish(GlobalScope, context, block))
}
-@Deprecated(
- message = "CoroutineScope.flux is deprecated in favour of top-level flux",
- level = DeprecationLevel.ERROR,
- replaceWith = ReplaceWith("flux(context, block)")
-) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
-@LowPriorityInOverloadResolution
-public fun <T> CoroutineScope.flux(
- context: CoroutineContext = EmptyCoroutineContext,
- @BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Flux<T> =
- Flux.from(reactorPublish(this, context, block))
-
private fun <T> reactorPublish(
scope: CoroutineScope,
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
-): Publisher<T> = Publisher { subscriber ->
- // specification requires NPE on null subscriber
- if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
- require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
+): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? ->
+ if (subscriber !is CoreSubscriber) {
+ subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted."))
+ return@onSubscribe
+ }
val currentContext = subscriber.currentContext()
- val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
+ val reactorContext = context.extendReactorContext(currentContext)
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
-private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx ->
- if (e !is CancellationException) {
+private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ctx ->
+ if (cause !is CancellationException) {
try {
- Operators.onOperatorError(e, ctx[ReactorContext]?.context ?: Context.empty())
+ Operators.onOperatorError(cause, ctx[ReactorContext]?.context ?: Context.empty())
} catch (e: Throwable) {
- handleCoroutineException(ctx, e)
+ cause.addSuppressed(e)
+ handleCoroutineException(ctx, cause)
}
}
}
+
+/** The proper way to reject the subscriber, according to
+ * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
+ */
+private fun <T> Subscriber<T>?.reject(t: Throwable) {
+ if (this == null)
+ throw NullPointerException("The subscriber can not be null")
+ onSubscribe(object: Subscription {
+ override fun request(n: Long) {
+ // intentionally left blank
+ }
+ override fun cancel() {
+ // intentionally left blank
+ }
+ })
+ onError(t)
+}
+
+/**
+ * @suppress
+ */
+@Deprecated(
+ message = "CoroutineScope.flux is deprecated in favour of top-level flux",
+ level = DeprecationLevel.HIDDEN,
+ replaceWith = ReplaceWith("flux(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
+public fun <T> CoroutineScope.flux(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Flux<T> =
+ Flux.from(reactorPublish(this, context, block))