aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt')
-rw-r--r--reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt36
1 files changed, 19 insertions, 17 deletions
diff --git a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
index d3942a16..6031e0a8 100644
--- a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
@@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.asFlow
-import kotlinx.coroutines.reactive.asPublisher
+import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
import kotlinx.coroutines.reactive.collect
+import kotlinx.coroutines.channels.*
import org.reactivestreams.*
import kotlin.coroutines.*
import java.util.concurrent.Flow as JFlow
/**
- * Transforms the given reactive [Publisher] into [Flow].
- * Use [buffer] operator on the resulting flow to specify the size of the backpressure.
- * More precisely, it specifies the value of the subscription's [request][Subscription.request].
- * [buffer] default capacity is used by default.
+ * Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow].
+ * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
+ * In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request].
+ * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
*
- * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
- * are discarded.
+ * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
+ * elements are discarded.
*/
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
- FlowAdapters.toPublisher(this).asFlow()
+ FlowAdapters.toPublisher(this).asFlow()
/**
- * Transforms the given flow to a reactive specification compliant [Publisher].
+ * Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher].
*
- * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
- * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
+ * methods.
+ * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
-public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
- val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
- return FlowAdapters.toFlowPublisher(reactivePublisher)
-}
+public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> =
+ FlowAdapters.toFlowPublisher(asReactivePublisher(context))
/**
- * Subscribes to this [Publisher] and performs the specified action for each received element.
- * Cancels subscription if any exception happens during collect.
+ * Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
+ *
+ * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
+ * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
FlowAdapters.toPublisher(this).collect(action)