diff options
Diffstat (limited to 'reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt')
-rw-r--r-- | reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 614e9ead..1a527a3c 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -13,15 +13,16 @@ import kotlinx.coroutines.intrinsics.* import org.reactivestreams.* import java.util.* import kotlin.coroutines.* +import kotlinx.coroutines.internal.* /** * Transforms the given reactive [Publisher] into [Flow]. - * Use [buffer] operator on the resulting flow to specify the size of the backpressure. - * More precisely, it specifies the value of the subscription's [request][Subscription.request]. - * [buffer] default capacity is used by default. + * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure. + * In effect, it specifies the value of the subscription's [request][Subscription.request]. + * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default. * - * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements - * are discarded. + * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight + * elements are discarded. * * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, * see its documentation for additional details. @@ -30,13 +31,13 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> = PublisherAsFlow(this) /** - * Transforms the given flow to a reactive specification compliant [Publisher]. + * Transforms the given flow into a reactive specification compliant [Publisher]. * * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, * see its documentation for additional details. * - * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. - * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods. + * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ @@ -54,8 +55,8 @@ private class PublisherAsFlow<T : Any>( PublisherAsFlow(publisher, context, capacity, onBufferOverflow) /* - * Suppress for Channel.CHANNEL_DEFAULT_CAPACITY. - * It's too counter-intuitive to be public and moving it to Flow companion + * The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY. + * It's too counter-intuitive to be public, and moving it to Flow companion * will also create undesired effect. */ @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") @@ -112,7 +113,7 @@ private class PublisherAsFlow<T : Any>( collectImpl(scope.coroutineContext, SendingCollector(scope.channel)) } -@Suppress("SubscriberImplementation") +@Suppress("ReactiveStreamsSubscriberImplementation") private class ReactiveSubscriber<T : Any>( capacity: Int, onBufferOverflow: BufferOverflow, @@ -124,11 +125,15 @@ private class ReactiveSubscriber<T : Any>( // be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel private val channel = Channel<T>(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow) - suspend fun takeNextOrNull(): T? = channel.receiveOrNull() + suspend fun takeNextOrNull(): T? { + val result = channel.receiveCatching() + result.exceptionOrNull()?.let { throw it } + return result.getOrElse { null } // Closed channel + } override fun onNext(value: T) { // Controlled by requestSize - require(channel.offer(value)) { "Element $value was not added to channel because it was full, $channel" } + require(channel.trySend(value).isSuccess) { "Element $value was not added to channel because it was full, $channel" } } override fun onComplete() { @@ -184,9 +189,15 @@ public class FlowSubscription<T>( @JvmField public val flow: Flow<T>, @JvmField public val subscriber: Subscriber<in T>, context: CoroutineContext -) : Subscription, AbstractCoroutine<Unit>(context, true) { +) : Subscription, AbstractCoroutine<Unit>(context, initParentJob = false, true) { + /* + * We deliberately set initParentJob to false and do not establish parent-child + * relationship because FlowSubscription doesn't support it + */ private val requested = atomic(0L) private val producer = atomic<Continuation<Unit>?>(createInitialContinuation()) + @Volatile + private var cancellationRequested = false // This code wraps startCoroutineCancellable into continuation private fun createInitialContinuation(): Continuation<Unit> = Continuation(coroutineContext) { @@ -196,18 +207,25 @@ public class FlowSubscription<T>( private suspend fun flowProcessing() { try { consumeFlow() - subscriber.onComplete() - } catch (e: Throwable) { - try { - if (e is CancellationException) { - subscriber.onComplete() - } else { - subscriber.onError(e) + } catch (cause: Throwable) { + @Suppress("INVISIBLE_MEMBER") + val unwrappedCause = unwrap(cause) + if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) { + try { + subscriber.onError(cause) + } catch (e: Throwable) { + // Last ditch report + cause.addSuppressed(e) + handleCoroutineException(coroutineContext, cause) } - } catch (e: Throwable) { - // Last ditch report - handleCoroutineException(coroutineContext, e) } + return + } + // We only call this if `consumeFlow()` finished successfully + try { + subscriber.onComplete() + } catch (e: Throwable) { + handleCoroutineException(coroutineContext, e) } } @@ -231,6 +249,7 @@ public class FlowSubscription<T>( } override fun cancel() { + cancellationRequested = true cancel(null) } @@ -243,7 +262,7 @@ public class FlowSubscription<T>( if (old <= 0L) { assert(old == 0L) // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine - while(true) { + while (true) { val producer = producer.getAndSet(null) ?: continue // spin if not set yet producer.resume(Unit) break |