diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt index d3942a16..6031e0a8 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt @@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.asFlow -import kotlinx.coroutines.reactive.asPublisher +import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher import kotlinx.coroutines.reactive.collect +import kotlinx.coroutines.channels.* import org.reactivestreams.* import kotlin.coroutines.* import java.util.concurrent.Flow as JFlow /** - * Transforms the given reactive [Publisher] into [Flow]. - * Use [buffer] operator on the resulting flow to specify the size of the backpressure. - * More precisely, it specifies the value of the subscription's [request][Subscription.request]. - * [buffer] default capacity is used by default. + * Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow]. + * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure. + * In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request]. + * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default. * - * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements - * are discarded. + * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight + * elements are discarded. */ public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> = - FlowAdapters.toPublisher(this).asFlow() + FlowAdapters.toPublisher(this).asFlow() /** - * Transforms the given flow to a reactive specification compliant [Publisher]. + * Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher]. * - * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. - * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber] + * methods. + * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ @JvmOverloads // binary compatibility -public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> { - val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context) - return FlowAdapters.toFlowPublisher(reactivePublisher) -} +public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> = + FlowAdapters.toFlowPublisher(asReactivePublisher(context)) /** - * Subscribes to this [Publisher] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect]. */ public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit = FlowAdapters.toPublisher(this).collect(action) |