diff options
Diffstat (limited to 'kotlinx-coroutines-core/common/src/flow/Channels.kt')
-rw-r--r-- | kotlinx-coroutines-core/common/src/flow/Channels.kt | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index e883c3b4..5cc8ad8b 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -30,7 +30,8 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni emitAllImpl(channel, consume = true) private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) { - // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed". + ensureActive() + // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching". // It has smaller and more efficient spilled state which also allows to implement a manual kludge to // fix retention of the last emitted value. // See https://youtrack.jetbrains.com/issue/KT-16222 @@ -47,9 +48,9 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, // L$1 <- channel // L$2 <- cause // L$3 <- this$run (actually equal to this) - val result = run { channel.receiveOrClosed() } + val result = run { channel.receiveCatching() } if (result.isClosed) { - result.closeCause?.let { throw it } + result.exceptionOrNull()?.let { throw it } break // returns normally when result.closeCause == null } // result is spilled here to the coroutine state and retained after the call, even though @@ -58,7 +59,7 @@ private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, // L$1 <- channel // L$2 <- cause // L$3 <- result - emit(result.value) + emit(result.getOrThrow()) } } catch (e: Throwable) { cause = e @@ -133,17 +134,12 @@ private class ChannelAsFlow<T>( override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow) - override fun dropChannelOperators(): Flow<T>? = + override fun dropChannelOperators(): Flow<T> = ChannelAsFlow(channel, consume) override suspend fun collectTo(scope: ProducerScope<T>) = SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll - override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> { - markConsumed() // fail fast on repeated attempt to collect it - return super.broadcastImpl(scope, start) - } - override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> { markConsumed() // fail fast on repeated attempt to collect it return if (capacity == Channel.OPTIONAL_CHANNEL) { @@ -173,22 +169,16 @@ private class ChannelAsFlow<T>( * 2) Flow consumer completes normally when the original channel completes (~is closed) normally. * 3) If the flow consumer fails with an exception, subscription is cancelled. */ -@FlowPreview +@Deprecated( + level = DeprecationLevel.WARNING, + message = "'BroadcastChannel' is obsolete and all coreresponding operators are deprecated " + + "in the favour of StateFlow and SharedFlow" +) // Since 1.5.0, was @FlowPreview, safe to remove in 1.7.0 public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow { emitAll(openSubscription()) } /** - * Creates a [broadcast] coroutine that collects the given flow. - * - * This transformation is **stateful**, it launches a [broadcast] coroutine - * that collects the given flow and thus resulting channel should be properly closed or cancelled. - * - * A channel with [default][Channel.Factory.BUFFERED] buffer size is created. - * Use [buffer] operator on the flow before calling `broadcastIn` to specify a value other than - * default and to control what happens when data is produced faster than it is consumed, - * that is to control backpressure behavior. - * * ### Deprecated * * **This API is deprecated.** The [BroadcastChannel] provides a complex channel-like API for hot flows. @@ -202,13 +192,26 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow { @Deprecated( message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel", replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"), - level = DeprecationLevel.WARNING -) + level = DeprecationLevel.ERROR +) // WARNING in 1.4.0, error in 1.5.0, removed in 1.6.0 (was @FlowPreview) public fun <T> Flow<T>.broadcastIn( scope: CoroutineScope, start: CoroutineStart = CoroutineStart.LAZY -): BroadcastChannel<T> = - asChannelFlow().broadcastImpl(scope, start) +): BroadcastChannel<T> { + // Backwards compatibility with operator fusing + val channelFlow = asChannelFlow() + val capacity = when (channelFlow.onBufferOverflow) { + BufferOverflow.SUSPEND -> channelFlow.produceCapacity + BufferOverflow.DROP_OLDEST -> Channel.CONFLATED + BufferOverflow.DROP_LATEST -> + throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST") + } + return scope.broadcast(channelFlow.context, capacity = capacity, start = start) { + collect { value -> + send(value) + } + } +} /** * Creates a [produce] coroutine that collects the given flow. |