aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactor/src/Flux.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactor/src/Flux.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactor/src/Flux.kt77
1 files changed, 77 insertions, 0 deletions
diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
new file mode 100644
index 00000000..18b84ac1
--- /dev/null
+++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -0,0 +1,77 @@
+
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
+
+package kotlinx.coroutines.reactor
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.reactive.*
+import org.reactivestreams.Publisher
+import reactor.core.CoreSubscriber
+import reactor.core.publisher.*
+import kotlin.coroutines.*
+import kotlin.internal.LowPriorityInOverloadResolution
+
+/**
+ * Creates cold reactive [Flux] that runs a given [block] in a coroutine.
+ * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
+ * Coroutine emits items with `send`. Unsubscribing cancels running coroutine.
+ *
+ * Coroutine context can be specified with [context] argument.
+ * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
+ *
+ * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
+ * `onNext` is not invoked concurrently.
+ *
+ * | **Coroutine action** | **Signal to subscriber**
+ * | -------------------------------------------- | ------------------------
+ * | `send` | `onNext`
+ * | Normal completion or `close` without cause | `onComplete`
+ * | Failure with exception or `close` with cause | `onError`
+ *
+ * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
+ *
+ * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
+ * to cancellation and error handling may change in the future.
+ */
+@ExperimentalCoroutinesApi
+public fun <T> flux(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Flux<T> {
+ require(context[Job] === null) { "Flux context cannot contain job in it." +
+ "Its lifecycle should be managed via Disposable handle. Had $context" }
+ return Flux.from(reactorPublish(GlobalScope, context, block))
+}
+
+@Deprecated(
+ message = "CoroutineScope.flux is deprecated in favour of top-level flux",
+ level = DeprecationLevel.WARNING,
+ replaceWith = ReplaceWith("flux(context, block)")
+) // Since 1.3.0, will be error in 1.3.1 and hidden in 1.4.0. Binary compatibility with Spring
+@LowPriorityInOverloadResolution
+public fun <T> CoroutineScope.flux(
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Flux<T> =
+ Flux.from(reactorPublish(this, context, block))
+
+private fun <T> reactorPublish(
+ scope: CoroutineScope,
+ context: CoroutineContext = EmptyCoroutineContext,
+ @BuilderInference block: suspend ProducerScope<T>.() -> Unit
+): Publisher<T> = Publisher { subscriber ->
+ // specification requires NPE on null subscriber
+ if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
+ require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
+ val currentContext = subscriber.currentContext()
+ val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
+ val newContext = scope.newCoroutineContext(context + reactorContext)
+ val coroutine = PublisherCoroutine(newContext, subscriber)
+ subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+} \ No newline at end of file