aboutsummaryrefslogtreecommitdiffstats
path: root/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
diff options
context:
space:
mode:
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt')
-rw-r--r--reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt69
1 files changed, 44 insertions, 25 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
index 614e9ead..1a527a3c 100644
--- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
+++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
@@ -13,15 +13,16 @@ import kotlinx.coroutines.intrinsics.*
import org.reactivestreams.*
import java.util.*
import kotlin.coroutines.*
+import kotlinx.coroutines.internal.*
/**
* Transforms the given reactive [Publisher] into [Flow].
- * Use [buffer] operator on the resulting flow to specify the size of the backpressure.
- * More precisely, it specifies the value of the subscription's [request][Subscription.request].
- * [buffer] default capacity is used by default.
+ * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
+ * In effect, it specifies the value of the subscription's [request][Subscription.request].
+ * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
*
- * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
- * are discarded.
+ * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
+ * elements are discarded.
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
@@ -30,13 +31,13 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this)
/**
- * Transforms the given flow to a reactive specification compliant [Publisher].
+ * Transforms the given flow into a reactive specification compliant [Publisher].
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
*
- * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
- * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
+ * An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
+ * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@@ -54,8 +55,8 @@ private class PublisherAsFlow<T : Any>(
PublisherAsFlow(publisher, context, capacity, onBufferOverflow)
/*
- * Suppress for Channel.CHANNEL_DEFAULT_CAPACITY.
- * It's too counter-intuitive to be public and moving it to Flow companion
+ * The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
+ * It's too counter-intuitive to be public, and moving it to Flow companion
* will also create undesired effect.
*/
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@@ -112,7 +113,7 @@ private class PublisherAsFlow<T : Any>(
collectImpl(scope.coroutineContext, SendingCollector(scope.channel))
}
-@Suppress("SubscriberImplementation")
+@Suppress("ReactiveStreamsSubscriberImplementation")
private class ReactiveSubscriber<T : Any>(
capacity: Int,
onBufferOverflow: BufferOverflow,
@@ -124,11 +125,15 @@ private class ReactiveSubscriber<T : Any>(
// be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel
private val channel = Channel<T>(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow)
- suspend fun takeNextOrNull(): T? = channel.receiveOrNull()
+ suspend fun takeNextOrNull(): T? {
+ val result = channel.receiveCatching()
+ result.exceptionOrNull()?.let { throw it }
+ return result.getOrElse { null } // Closed channel
+ }
override fun onNext(value: T) {
// Controlled by requestSize
- require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" }
+ require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" }
}
override fun onComplete() {
@@ -184,9 +189,15 @@ public class FlowSubscription<T>(
@JvmField public val flow: Flow<T>,
@JvmField public val subscriber: Subscriber<in T>,
context: CoroutineContext
-) : Subscription, AbstractCoroutine<Unit>(context, true) {
+) : Subscription, AbstractCoroutine<Unit>(context, initParentJob = false, true) {
+ /*
+ * We deliberately set initParentJob to false and do not establish parent-child
+ * relationship because FlowSubscription doesn't support it
+ */
private val requested = atomic(0L)
private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())
+ @Volatile
+ private var cancellationRequested = false
// This code wraps startCoroutineCancellable into continuation
private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) {
@@ -196,18 +207,25 @@ public class FlowSubscription<T>(
private suspend fun flowProcessing() {
try {
consumeFlow()
- subscriber.onComplete()
- } catch (e: Throwable) {
- try {
- if (e is CancellationException) {
- subscriber.onComplete()
- } else {
- subscriber.onError(e)
+ } catch (cause: Throwable) {
+ @Suppress("INVISIBLE_MEMBER")
+ val unwrappedCause = unwrap(cause)
+ if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) {
+ try {
+ subscriber.onError(cause)
+ } catch (e: Throwable) {
+ // Last ditch report
+ cause.addSuppressed(e)
+ handleCoroutineException(coroutineContext, cause)
}
- } catch (e: Throwable) {
- // Last ditch report
- handleCoroutineException(coroutineContext, e)
}
+ return
+ }
+ // We only call this if `consumeFlow()` finished successfully
+ try {
+ subscriber.onComplete()
+ } catch (e: Throwable) {
+ handleCoroutineException(coroutineContext, e)
}
}
@@ -231,6 +249,7 @@ public class FlowSubscription<T>(
}
override fun cancel() {
+ cancellationRequested = true
cancel(null)
}
@@ -243,7 +262,7 @@ public class FlowSubscription<T>(
if (old <= 0L) {
assert(old == 0L)
// Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine
- while(true) {
+ while (true) {
val producer = producer.getAndSet(null) ?: continue // spin if not set yet
producer.resume(Unit)
break